In [None]:
# import libraries

import numpy as np 
import os
from pickle import dump, load
from keras.preprocessing import image
from keras.applications.resnet import ResNet101
from keras.applications.resnet import preprocess_input
from PIL import Image
import math
import string
from keras.preprocessing.text import Tokenizer
from keras.utils import to_categorical
from keras.callbacks import ModelCheckpoint,EarlyStopping
from keras.models import Model, load_model
from keras.optimizers import SGD
from keras.utils import plot_model
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from keras import backend as K
from keras import regularizers
from keras.layers import (GRU, BatchNormalization, Dense, Dropout, Embedding,
                          Input, Lambda, TimeDistributed, RepeatVector, concatenate)
from keras.models import Model

# IMAGE PREPROCESSING

In [None]:
# extract features of images in flickr8k dataset directory

def feature_extraction(directory):
    base_model = ResNet101(weights='imagenet')
    model = Model(inputs=base_model.input, outputs=base_model.layers[-2].output)
    img_id = []
    img_matrices = []
    for img_file in os.listdir(directory):
        img_path = directory + '/' + img_file
        img = image.load_img(img_path, target_size=(224, 224))
        x = image.img_to_array(img)
        x = preprocess_input(x)
        img_id.append(os.path.splitext(img_file)[0])
        img_matrices.append(x)    
    img_matrices = np.array(img_matrices)
    assert(len(img_matrices.shape)==4)
    img_features = model.predict(img_matrices, verbose=1)
    return {'ids':img_id, 'features':img_features}

if __name__ == "__main__":
    image_directory = 'Flicker8k_Dataset'
    print("extracting features...")
    features_dict = feature_extraction(image_directory)
    dump(features_dict, open('features.pkl', 'wb'))

In [None]:
# load the extracted features

def loading_features(dict_dir, dataset_dir, repeat_times = 1):
    assert(repeat_times >= 1)
    img_ids = []
    with open(dataset_dir, 'r') as f:
        for line in f.readlines():
            img_ids.append(os.path.splitext(line)[0])
    features_dict = load(open(dict_dir, 'rb'))
    dataset_features = []
    for img_id in img_ids:
        fidx = features_dict['ids'].index(img_id)
        dataset_features.append(np.vstack([features_dict['features'][fidx, :]]*repeat_times))
    dataset_features = np.vstack(dataset_features)
    return dataset_features

# extract features from an image 

def extracting_features_from_image(file_dir):
    img = image.load_img(file_dir, target_size=(299, 299))
    x = image.img_to_array(img)
    x = np.expand_dims(x, axis=0)
    x = preprocess_input(x)
    base_model = ResNet101(weights='imagenet')
    model = Model(inputs=base_model.input, outputs=base_model.layers[-2].output)
    return model.predict(x)

# TEXT PREPROCESSING

In [None]:
# separate captions from 'token.txt'

def load_token_text(token_dir):
    sents_dict = {}
    with open(token_dir, 'r') as f:
        for line in f.readlines():
            words = line.strip('\n').split()
            img_id = words[0].split('.')[0]
            sent = ' '.join(words[1:])
            if img_id in sents_dict.keys():
                sents_dict[img_id].append(sent)
            else:
                sents_dict[img_id] = [sent]           
    return sents_dict

# append captions with 'startseq' and 'endseq'

def load_dataset_token(dataset_dir, token_dir, start_end = True):
    all_sents = load_token_text(token_dir)
    img_ids = []
    with open(dataset_dir, 'r') as f:
        for line in f.readlines():
            img_ids.append(os.path.splitext(line)[0])
    sent_list = []
    for id in img_ids:
        for sent in all_sents[id]:
            sent_ = sent
            if start_end:
                sent_ = 'startseq ' + sent_ + ' endseq'
            sent_list.append(sent_)    
    return sent_list

#tokenize the captions

def create_tokenizer(dataset_dir, token_dir, start_end = True, use_all = False):
    num_words = None
    sent_list = load_dataset_token(dataset_dir, token_dir, start_end)
    if use_all:
        tokenizer = Tokenizer()
    else:
        if num_words:
            tokenizer = Tokenizer(num_words)
        else:
            tokenizer = Tokenizer()
    tokenizer.fit_on_texts(sent_list)
    return tokenizer

# fit the tokenizer on captions

def clean_test_sentences(tokenizer, sents_list):
    cleaned_sents_list= []
    for sents in sents_list:
        sequences = tokenizer.texts_to_sequences(sents)
        cleaned_sents_list.append(tokenizer.sequences_to_texts(sequences))    
    return cleaned_sents_list

#BATCH GENERATOR

In [None]:
# generate batches of data for training the model

def data_generator(batch_size, max_len, tokenizer, dict_dir, dataset_dir, token_dir):
    vocab_size = tokenizer.num_words or (len(tokenizer.word_index)+1)
    img_features = loading_features(dict_dir, dataset_dir, 5)
    raw_sentences = load_dataset_token(dataset_dir, token_dir, True)
    N = img_features.shape[0]    
    while True:
        for i in range(0, N, batch_size):
            sequences = tokenizer.texts_to_sequences(raw_sentences[i:i+batch_size])    
            X_text = []
            Y_text = []
            for seq in sequences:
                if len(seq) > max_len:
                    X_text.append(seq[:max_len])
                    Y_text.append(seq[1:max_len+1])
                else:
                    X_text.append(seq[:len(seq)-1] + [0]*(max_len-len(seq)+1))
                    Y_text.append(seq[1:] + [0]*(max_len-len(seq)+1))
            X_text_mat = np.array(X_text)
            Y_text_mat = to_categorical(Y_text, vocab_size)
            yield ([img_features[i:i+batch_size, :], X_text_mat, np.zeros([X_text_mat.shape[0], unit_size])], 
                    Y_text_mat)

# DEFINE PAR-INJECT CONCATENATE MODEL

In [None]:
unit_size = 512
# define the par-inject concat model

def par_concat_model(vocab_size, max_len, reg):

    # Image embedding
    inputs1 = Input(shape=(2048,))
    X_img = Dropout(0.5)(inputs1)
    X_img = Dense(unit_size, use_bias = False, 
                        kernel_regularizer=regularizers.l2(reg),
                        name = 'dense_img')(X_img)
    X_img = BatchNormalization(name='batch_normalization_img')(X_img)
    X_img = RepeatVector(max_len)(X_img)

    # Text embedding
    inputs2 = Input(shape=(max_len,))
    X_text = Embedding(vocab_size, unit_size, mask_zero = True, name = 'emb_text')(inputs2)
    X_text = Dropout(0.5)(X_text)

    # Initial States
    a0 = Input(shape=(unit_size,))
    #c0 = Input(shape=(unit_size,))
    merge=concatenate([X_img, X_text ])

    LSTMLayer = LSTM(unit_size, return_sequences = True, return_state = True, dropout=0.5, name = 'lstm')
    A, a = LSTMLayer(merge, initial_state=[a0])
    output = TimeDistributed(Dense(vocab_size, activation='softmax',
                                     kernel_regularizer = regularizers.l2(reg), 
                                     bias_regularizer = regularizers.l2(reg)), name = 'time_distributed_softmax')(A)
    return Model(inputs=[inputs1, inputs2, a0], outputs=output, name='par')


# TRAINING PHASE

In [None]:
# function for training the model

def training(dirs_dict, lr, decay, reg, batch_size, epochs, max_len, initial_epoch, previous_model = None):
    dict_dir = dirs_dict['dict_dir']
    token_dir = dirs_dict['token_dir']
    train_dir = dirs_dict['train_dir']
    dev_dir = dirs_dict['dev_dir']
    params_dir = dirs_dict['params_dir']

    # Use Tokenizer to create vocabulary
    tokenizer = create_tokenizer(train_dir, token_dir, start_end = True)
    
    # loading data
    generator_train = data_generator(batch_size, max_len, tokenizer, dict_dir, train_dir, token_dir)
    generator_dev = data_generator(50, max_len, tokenizer, dict_dir, dev_dir, token_dir)

    vocab_size = tokenizer.num_words or (len(tokenizer.word_index)+1)

    # Define model 
    par_model = par_concat_model(vocab_size, max_len, reg)

    if not previous_model:
        par_model.summary()
        plot_model(par_model, to_file='model.png',show_shapes=True)
    else:
        par_model.load_weights(previous_model, by_name = True, skip_mismatch=True)

    # Define checkpoint callback
    file_path = params_dir + '/model-ep{epoch:03d}-loss{loss:.4f}-val_loss{val_loss:.4f}.h5'
    checkpoint = ModelCheckpoint(file_path, monitor='val_loss', verbose=1, save_weights_only = True, period=1)
    EarlyStop=EarlyStopping(monitor='val_loss',mode='min', patience=5)

    # Compile the model
    par_model.compile(loss='categorical_crossentropy', optimizer=SGD(lr=0.01, decay=1e-6, momentum=0.9, nesterov=True), metrics=['accuracy'])

    # training
    history=par_model.fit_generator(generator_train, steps_per_epoch=30000//batch_size, epochs=epochs, verbose=1, 
                            callbacks=[checkpoint,EarlyStop],
                            validation_data = generator_dev, validation_steps = 100, initial_epoch = initial_epoch)

    # summarize history for loss
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('model loss')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.legend(['train', 'validation'], loc='upper left')
    plt.show()

    # summarize history for accuracy
    plt.plot(history.history['accuracy'])
    plt.plot(history.history['val_accuracy'])
    plt.title('model accuracy')
    plt.ylabel('accuracy')
    plt.xlabel('epoch')
    plt.legend(['train', 'validation'], loc='upper left')
    plt.show()

if __name__ == "__main__":

    dict_dir = 'features.pkl'
    train_dir = 'Flickr8k_text/Flickr_8k.trainImages.txt'
    dev_dir = 'Flickr8k_text/Flickr_8k.devImages.txt'
    token_dir = 'Flickr8k_text/Flickr8k.token.txt'
    # folder to save model weights
    params_dir = 'RESNET101/MODELS'

    dirs_dict={'dict_dir':dict_dir, 'train_dir':train_dir, 'dev_dir':dev_dir, 
                'token_dir':token_dir, 'params_dir':params_dir}   
    training(dirs_dict, lr=0.01, decay=1e-6, reg = 1e-4, batch_size = 240, epochs = 2000, 
             max_len = 24, initial_epoch = 0, previous_model = None)

# DECODING PHASE

# Greedy search decoding

In [None]:
# define the greedy model

def greedy_model(vocab_size, max_len):    
    EncoderDense = Dense(unit_size, use_bias=False, name = 'dense_img')
    EmbeddingLayer = Embedding(vocab_size, unit_size, mask_zero = True, name = 'emb_text')
    LSTMLayer = GRU(unit_size, return_state = True, name = 'lstm')
    SoftmaxLayer = Dense(vocab_size, activation='softmax', name = 'time_distributed_softmax')
    BatchNormLayer = BatchNormalization(name='batch_normalization_img')

    # Image embedding
    inputs1 = Input(shape=(2048,))
    X_img = EncoderDense(inputs1)
    X_img = BatchNormLayer(X_img)
    X_img = RepeatVector(1)(X_img)

    # Text embedding
    inputs2 = Input(shape=(1,))
    X_text = EmbeddingLayer(inputs2)

    # Initial States
    a0 = Input(shape=(unit_size,))
    #c0 = Input(shape=(unit_size,))

    a=a0;
    outputs = []
    for i in range(max_len):
        merge=concatenate([X_img,X_text])
        A, a = LSTMLayer(merge,initial_state=[a])
        output = SoftmaxLayer(A)
        outputs.append(output)
        x = Lambda(lambda x : K.expand_dims(K.argmax(x)))(output)
        X_text= EmbeddingLayer(x)   
    return Model(inputs=[inputs1, inputs2, a0], outputs=outputs, name='par_greedy_inference_v2')

# predict words from dictionary using greedy search

def decoder_greedy(inf_model, tokenizer, features, post_process = True):
    assert(features.shape[0]>0 and features.shape[1] == 2048)
    N = features.shape[0]
    startseq = np.repeat([tokenizer.word_index['startseq']], N)
    a0 = np.zeros([N, unit_size])
    #c0 = np.zeros([N, unit_size])
    y_preds = np.array(inf_model.predict([features, startseq, a0], verbose = 1))
    y_preds = np.transpose(y_preds, axes = [1,0,2])   
    sequences = np.argmax(y_preds, axis = -1)
    sents = tokenizer.sequences_to_texts(sequences)
    if post_process:
        # post processing: 'endseq'
        sents_pp = []
        for sent in sents:
            if 'endseq' in sent.split():
                words = sent.split()
                sents_pp.append(' '.join(words[:words.index('endseq')]))
            else:
                sents_pp.append(sent)
        sents = sents_pp
    return sents

# generate captions using greedy search

def generate_captions_greedy(model_dir, tokenizer, test_references, test_features, max_len):
    vocab_size = tokenizer.num_words or (len(tokenizer.word_index)+1)

    # prepare inference model
    par_inference = greedy_model(vocab_size, max_len)
    par_inference.load_weights(model_dir, by_name = True, skip_mismatch=True)
    test_candidates = decoder_greedy(par_inference, tokenizer, test_features, True)
    assert(len(test_references) == len(test_candidates))
    for i in range(len(test_candidates)):
        references = [r.lower().split() for r in test_references[i]]
        candidate = test_candidates[i].split()
    return test_candidates


# Beam search decoding

In [None]:
# define beam search model

def beamsearch_model(vocab_size):
    EmbeddingLayer = Embedding(vocab_size, unit_size, mask_zero = True, name='emb_text')
    LSTMLayer = GRU(unit_size, return_state = True, name='lstm')
    SoftmaxLayer = Dense(vocab_size, activation='softmax', name='time_distributed_softmax')
    EncoderDense = Dense(unit_size, use_bias = False, name = 'dense_img')
    BatchNormLayer = BatchNormalization(name = 'batch_normalization_img')
    
    #image emdedding
    inputs = Input(shape=(2048,))
    X_img = EncoderDense(inputs)
    X_img = BatchNormLayer(X_img)
    X_img = RepeatVector(1)(X_img)
    
    #Text embedding
    cur_word = Input(shape=(1,))
    X_text = EmbeddingLayer(cur_word)

    # initial states
    a0 = Input(shape=(unit_size,))
    #c0 = Input(shape=(unit_size,))

    merge=concatenate([X_img,X_text])
    A, a = LSTMLayer(merge,initial_state=[a0])
    output = SoftmaxLayer(A)
    return Model(inputs=[inputs, cur_word, a0], outputs=[output,a])

#search for words in dictionary using beam search

def beam_searching(decoder_model, features, a0 , tokenizer, beam_width, max_len):   
    assert(a0.shape == (1, unit_size) and isinstance(beam_width, int) and
             beam_width > 0 and max_len > 0)

    # === first step ====
    start_word = np.array([tokenizer.word_index['startseq']])
    output, a = decoder_model.predict([features,start_word,  a0], verbose=0)
    assert(len(output.shape)==2 and beam_width<=output.shape[1])

    # === define data structure and initialization====
    
    seeds = np.argpartition(-output, beam_width, axis=-1)[0, :beam_width]
    start_words = np.array(seeds)
    next_activates = np.repeat(a, beam_width, axis = 0)
    #next_cells = np.repeat(c, beam_width, axis = 0)
    scores = [math.log(output[0, i]) for i in seeds]
    routes = [[i] for i in seeds]
    res = {'scores':[], 'routes':[]}

    # === search ====
    for i in range(max_len-1):
        next_features = np.repeat(features,next_activates.shape[0], axis = 0)
        outputs, activations = decoder_model.predict([next_features, start_words,next_activates], 
                                                            verbose=0)
        next_features=features
        # pick <beam_width> highest scores from every route as a candidate
        candidates = np.argpartition(-outputs, beam_width, axis=-1)[:,:beam_width]
        # r <----> i-th in scores and routes, c is the index of vocabulary
        candidates = [(r, c) for r in range(candidates.shape[0]) for c in candidates[r,:]]
        # calculate score according to the candidates
        candidates_scores = np.array([scores[r] + math.log(outputs[r, c]) for r, c in candidates])
        # consider the length of the current sentence
        #weigthed_scores = 1/(i+1)**alpha * candidates_scores
        if beam_width < len(candidates):
            choosen_candidates = np.argpartition(-candidates_scores, beam_width)[:beam_width]
        else:
            choosen_candidates = np.arange(0, len(candidates))

        # update scores, routes
        # construct new start_words, activations, cells
        start_words = []
        next_activates = []
        #next_cells = []
        updated_scores = []
        updated_routes = []
        for idx in choosen_candidates:
            r, c = candidates[idx]
            if c == tokenizer.word_index['endseq']:
                res['routes'].append(routes[r])                
                if i != 0:
                    res['scores'].append(1/len(routes[r])**0.7 * candidates_scores[idx])
                else:
                    res['scores'].append(-math.inf)               
                beam_width -= 1
            else:
                start_words.append(c)
                next_activates.append(activations[r, :])
                #next_cells.append(cells[r, :])
                updated_scores.append(candidates_scores[idx])
                updated_routes.append(routes[r]+[c])

        start_words = np.array(start_words)
        next_activates = np.array(next_activates)
        #next_cells = np.array(next_cells)
        scores = updated_scores
        routes = updated_routes
        if beam_width <= 0:
            break
    res['scores'] += [1/len(routes[i])**0.7 * scores[i] for i in range(len(scores))]
    res['routes'] += routes
    return res

# generate captions using beam search

def generate_captions_beamsearch(model_dir, tokenizer, test_references, test_features, max_len, beam_width):
    vocab_size = tokenizer.num_words or (len(tokenizer.word_index)+1)

    # prepare inference model
    beamsearching_model = beamsearch_model(vocab_size)
    beamsearching_model.load_weights(model_dir, by_name = True, skip_mismatch=True)
    feature_size = test_features.shape[0]
    a0=  np.zeros([feature_size, unit_size])

    # generate candidate sentences
    test_candidates = []
    for i in range(feature_size):
        res = beam_searching(beamsearching_model, test_features[i, :].reshape(1,-1), a0[i, :].reshape(1,-1), tokenizer, beam_width, max_len)
        best_idx = np.argmax(res['scores'])
        test_candidates.append(tokenizer.sequences_to_texts([res['routes'][best_idx]])[0])
    assert(len(test_references) == len(test_candidates))
    for i in range(len(test_candidates)):
        references = [r.split() for r in test_references[i]]
        candidate = test_candidates[i].split()
    return test_candidates

SAVING THE CANDIDATE AND REFERENCE CAPTIONS FOR TESTING

In [None]:
train_dir = 'Flickr8k_text/Flickr_8k.trainImages.txt'
token_dir = 'Flickr8k_text/Flickr8k.token.txt'
# load vocabulary
tokenizer = create_tokenizer(train_dir, token_dir, start_end = True, use_all=True)
vocab_size  = tokenizer.num_words or (len(tokenizer.word_index)+1)
max_len = 24 

In [None]:
def load_filckr8k_features(dict_dir, dataset_dir): 
    img_ids = []
    with open(dataset_dir, 'r') as f:
        for line in f.readlines():
            img_ids.append(os.path.splitext(line)[0])
    features = loading_features(dict_dir, dataset_dir, repeat_times = 1)
    return img_ids, features

# function for generating captions 

def generate_captions(model_dir, method='b', beam_width = 20):
    dict_dir = 'features.pkl'
    train_dir = 'Flickr8k_text/Flickr_8k.trainImages.txt'
    test_dir = 'Flickr8k_text/Flickr_8k.testImages.txt'
    token_dir = 'Flickr8k_text/Flickr8k.token.txt'
    max_len = 24
    tokenizer = create_tokenizer(train_dir, token_dir)
    filter_tokenizer = create_tokenizer(test_dir, token_dir, use_all=True)
    test_ids, test_features = load_filckr8k_features(dict_dir, test_dir)
    all_sents = load_token_text(token_dir)
    test_references = [all_sents[id] for id in test_ids]
    test_references = clean_test_sentences(filter_tokenizer, test_references)
    if method == 'g':
        candidates = generate_captions_greedy(model_dir, tokenizer, test_references, test_features, max_len)
    elif method == 'b':
        candidates = generate_captions_beamsearch(model_dir, tokenizer, test_references, test_features, max_len, beam_width)    
    return test_ids, test_references, candidates


In [None]:
# save captions generated using greedy search to a json file

if __name__ == '__main__':
    model_dir = 'RESNET101/MODELS/model_gru.h5'
    img_ids, test_references, candidates = generate_captions(model_dir, method='g')
    res={}
    gets={}
    for i in range(len(img_ids)):
      res[img_ids[i]]=[candidates[i]]
      gets[img_ids[i]]=test_references[i]   
    import json
    with open('res_g.json', 'w') as jsonfile:
        json.dump(res, jsonfile)
    with open('gets_g.json', 'w') as jsonfile:
      json.dump(gets, jsonfile)
    print('Captions saved to json file')

In [None]:
# save captions generated using beam search to a json file

if __name__ == '__main__':
    model_dir = 'RESNET101/MODELS/model_gru.h5'
    img_ids, test_references, candidates = generate_captions(model_dir, method='b', beam_width = 20)
    res={}
    gets={}
    for i in range(len(img_ids)):
      res[img_ids[i]]=[candidates[i]]
      gets[img_ids[i]]=test_references[i]    
    import json
    with open('res_b.json', 'w') as jsonfile:
        json.dump(res, jsonfile)
    with open('gets_b.json', 'w') as jsonfile:
      json.dump(gets, jsonfile)
    print('Captions saved to json file')

# BLEU (from MSCOCO evaluation server)

---



In [None]:
#!/usr/bin/env python
# bleu_scorer.py
# David Chiang <chiang@isi.edu>
# Copyright (c) 2004-2006 University of Maryland. All rights
# reserved. Do not redistribute without permission from the
# author. Not for commercial use.
# Modified by: 
# Hao Fang <hfang@uw.edu>
# Tsung-Yi Lin <tl483@cornell.edu>
'''Provides:
cook_refs(refs, n=4): Transform a list of reference sentences as strings into a form usable by cook_test().
cook_test(test, refs, n=4): Transform a test sentence as a string (together with the cooked reference sentences) into a form usable by score_cooked().
'''
import copy
import sys, math, re
from collections import defaultdict

def precook_bleu(s, n=4, out=False):
    """Takes a string as input and returns an object that can be given to
    either cook_refs or cook_test. This is optional: cook_refs and cook_test
    can take string arguments as well."""
    words = s.split()
    counts = defaultdict(int)
    for k in range(1,n+1):
        for i in range(len(words)-k+1):
            ngram = tuple(words[i:i+k])
            counts[ngram] += 1
    return (len(words), counts)

def cook_refs_bleu(refs, eff=None, n=4): ## lhuang: oracle will call with "average"
    '''Takes a list of reference sentences for a single segment
    and returns an object that encapsulates everything that BLEU
    needs to know about them.'''
    reflen = []
    maxcounts = {}
    for ref in refs:
        rl, counts = precook_bleu(ref, n)
        reflen.append(rl)
        for (ngram,count) in counts.items():
            maxcounts[ngram] = max(maxcounts.get(ngram,0), count)
    # Calculate effective reference sentence length.
    if eff == "shortest":
        reflen = min(reflen)
    elif eff == "average":
        reflen = float(sum(reflen))/len(reflen)
    ## lhuang: N.B.: leave reflen computaiton to the very end!!
    
    ## lhuang: N.B.: in case of "closest", keep a list of reflens!! (bad design)

    return (reflen, maxcounts)

def cook_test_bleu(test, crefs, eff=None, n=4):
    '''Takes a test sentence and returns an object that
    encapsulates everything that BLEU needs to know about it.'''
    reflen, refmaxcounts = crefs[0], crefs[1]
    testlen, counts = precook_bleu(test, n, True)
    result = {}
    # Calculate effective reference sentence length.
    if eff == "closest":
        result["reflen"] = min((abs(l-testlen), l) for l in reflen)[1]
    else: ## i.e., "average" or "shortest" or None
        result["reflen"] = reflen
    result["testlen"] = testlen
    result["guess"] = [max(0,testlen-k+1) for k in range(1,n+1)]
    result['correct'] = [0]*n
    for (ngram, count) in counts.items():
        result["correct"][len(ngram)-1] += min(refmaxcounts.get(ngram,0), count)
    return result

class BleuScorer(object):
    """Bleu scorer.
    """
    __slots__ = "n", "crefs", "ctest", "_score", "_ratio", "_testlen", "_reflen", "special_reflen"
    # special_reflen is used in oracle (proportional effective ref len for a node).

    def copy(self):
        ''' copy the refs.'''
        new = BleuScorer(n=self.n)
        new.ctest = copy.copy(self.ctest)
        new.crefs = copy.copy(self.crefs)
        new._score = None
        return new

    def __init__(self, test=None, refs=None, n=4, special_reflen=None):
        ''' singular instance '''
        self.n = n
        self.crefs = []
        self.ctest = []
        self.cook_append(test, refs)
        self.special_reflen = special_reflen

    def cook_append(self, test, refs):
        '''called by constructor and __iadd__ to avoid creating new instances.'''
        
        if refs is not None:
            self.crefs.append(cook_refs_bleu(refs))
            if test is not None:
                cooked_test = cook_test_bleu(test, self.crefs[-1])
                self.ctest.append(cooked_test) ## N.B.: -1
            else:
                self.ctest.append(None) # lens of crefs and ctest have to match

        self._score = None ## need to recompute

    def ratio(self, option=None):
        self.compute_score(option=option)
        return self._ratio

    def score_ratio(self, option=None):
        '''return (bleu, len_ratio) pair'''
        return (self.fscore(option=option), self.ratio(option=option))

    def score_ratio_str(self, option=None):
        return "%.4f (%.2f)" % self.score_ratio(option)

    def reflen(self, option=None):
        self.compute_score(option=option)
        return self._reflen

    def testlen(self, option=None):
        self.compute_score(option=option)
        return self._testlen        

    def retest(self, new_test):
        if type(new_test) is str:
            new_test = [new_test]
        assert len(new_test) == len(self.crefs), new_test
        self.ctest = []
        for t, rs in zip(new_test, self.crefs):
            self.ctest.append(cook_test(t, rs))
        self._score = None

        return self

    def rescore(self, new_test):
        ''' replace test(s) with new test(s), and returns the new score.'''
        
        return self.retest(new_test).compute_score()

    def size(self):
        assert len(self.crefs) == len(self.ctest), "refs/test mismatch! %d<>%d" % (len(self.crefs), len(self.ctest))
        return len(self.crefs)

    def __iadd__(self, other):
        '''add an instance (e.g., from another sentence).'''

        if type(other) is tuple:
            ## avoid creating new BleuScorer instances
            self.cook_append(other[0], other[1])
        else:
            assert self.compatible(other), "incompatible BLEUs."
            self.ctest.extend(other.ctest)
            self.crefs.extend(other.crefs)
            self._score = None ## need to recompute

        return self        

    def compatible(self, other):
        return isinstance(other, BleuScorer) and self.n == other.n

    def single_reflen(self, option="average"):
        return self._single_reflen(self.crefs[0][0], option)

    def _single_reflen(self, reflens, option=None, testlen=None):
        
        if option == "shortest":
            reflen = min(reflens)
        elif option == "average":
            reflen = float(sum(reflens))/len(reflens)
        elif option == "closest":
            reflen = min((abs(l-testlen), l) for l in reflens)[1]
        else:
            assert False, "unsupported reflen option %s" % option

        return reflen

    def recompute_score(self, option=None, verbose=0):
        self._score = None
        return self.compute_score(option, verbose)
        
    def compute_score(self, option=None, verbose=0):
        n = self.n
        small = 1e-9
        tiny = 1e-15 ## so that if guess is 0 still return 0
        bleu_list = [[] for _ in range(n)]

        if self._score is not None:
            return self._score

        if option is None:
            option = "average" if len(self.crefs) == 1 else "closest"

        self._testlen = 0
        self._reflen = 0
        totalcomps = {'testlen':0, 'reflen':0, 'guess':[0]*n, 'correct':[0]*n}

        # for each sentence
        for comps in self.ctest:            
            testlen = comps['testlen']
            self._testlen += testlen

            if self.special_reflen is None: ## need computation
                reflen = self._single_reflen(comps['reflen'], option, testlen)
            else:
                reflen = self.special_reflen

            self._reflen += reflen
                
            for key in ['guess','correct']:
                for k in range(n):
                    totalcomps[key][k] += comps[key][k]

            # append per image bleu score
            bleu = 1.
            for k in range(n):
                bleu *= (float(comps['correct'][k]) + tiny) \
                        /(float(comps['guess'][k]) + small) 
                bleu_list[k].append(bleu ** (1./(k+1)))
            ratio = (testlen + tiny) / (reflen + small) ## N.B.: avoid zero division
            if ratio < 1:
                for k in range(n):
                    bleu_list[k][-1] *= math.exp(1 - 1/ratio)

            #if verbose > 1:
               # print(comps, reflen)

        totalcomps['reflen'] = self._reflen
        totalcomps['testlen'] = self._testlen

        bleus = []
        bleu = 1.
        for k in range(n):
            bleu *= float(totalcomps['correct'][k] + tiny) \
                    / (totalcomps['guess'][k] + small)
            bleus.append(bleu ** (1./(k+1)))
        ratio = (self._testlen + tiny) / (self._reflen + small) ## N.B.: avoid zero division
        if ratio < 1:
            for k in range(n):
                bleus[k] *= math.exp(1 - 1/ratio)

        #if verbose > 0:
            #print(totalcomps)
            #print("ratio:", ratio)

        self._score = bleus
        return self._score, bleu_list


In [None]:
#!/usr/bin/env python
# 
# File Name : bleu.py
#
# Description : Wrapper for BLEU scorer.
#
# Creation Date : 06-01-2015
# Last Modified : Thu 19 Mar 2015 09:13:28 PM PDT
# Authors : Hao Fang <hfang@uw.edu> and Tsung-Yi Lin <tl483@cornell.edu>
#from bleu_scorer import BleuScorer

class Bleu:
    def __init__(self, n=4):
        # default compute Blue score up to 4
        self._n = n
        self._hypo_for_image = {}
        self.ref_for_image = {}

    def compute_score(self, gts, res):
        assert(gts.keys() == res.keys())
        imgIds = gts.keys()
        bleu_scorer = BleuScorer(n=self._n)
        for id in imgIds:
            hypo = res[id]
            ref = gts[id]

            # Sanity check.
            assert(type(hypo) is list)
            assert(len(hypo) == 1)
            assert(type(ref) is list)
            assert(len(ref) >= 1)
            bleu_scorer += (hypo[0], ref)

        #score, scores = bleu_scorer.compute_score(option='shortest')
        score, scores = bleu_scorer.compute_score(option='closest', verbose=1)
        #score, scores = bleu_scorer.compute_score(option='average', verbose=1)
        # return (bleu, bleu_info)
        return score, scores

    def method(self):
        return "Bleu"

In [None]:
def bleu():
    scorer = Bleu(n=4)
    # scorer += (hypo[0], ref1)   # hypo[0] = 'word1 word2 word3 ...'
    #                                 # ref = ['word1 word2 word3 ...', 'word1 word2 word3 ...']
    score, scores = scorer.compute_score(gts, res)
    print('BLEU-1 = %s' % score[0])
    print('BLEU-2 = %s' % score[1])
    print('BLEU-3 = %s' % score[2])
    print('BLEU-4 = %s' % score[3])

# ROUGE-L (from MSCOCO evaluation server)

In [None]:
#!/usr/bin/env python
# File Name : rouge.py
# Description : Computes ROUGE-L metric as described by Lin and Hovey (2004)
# Creation Date : 2015-01-07 06:03
# Author : Ramakrishna Vedantam <vrama91@vt.edu>

import numpy as np
import pdb

def my_lcs(string, sub):
    """
    Calculates longest common subsequence for a pair of tokenized strings
    :param string : list of str : tokens from a string split using whitespace
    :param sub : list of str : shorter string, also split using whitespace
    :returns: length (list of int): length of the longest common subsequence between the two strings

    Note: my_lcs only gives length of the longest common subsequence, not the actual LCS
    """
    if(len(string)< len(sub)):
        sub, string = string, sub

    lengths = [[0 for i in range(0,len(sub)+1)] for j in range(0,len(string)+1)]

    for j in range(1,len(sub)+1):
        for i in range(1,len(string)+1):
            if(string[i-1] == sub[j-1]):
                lengths[i][j] = lengths[i-1][j-1] + 1
            else:
                lengths[i][j] = max(lengths[i-1][j] , lengths[i][j-1])

    return lengths[len(string)][len(sub)]

class Rouge():
    '''
    Class for computing ROUGE-L score for a set of candidate sentences for the MS COCO test set

    '''
    def __init__(self):
        # vrama91: updated the value below based on discussion with Hovey
        self.beta = 1.2

    def calc_score(self, candidate, refs):
        """
        Compute ROUGE-L score given one candidate and references for an image
        :param candidate: str : candidate sentence to be evaluated
        :param refs: list of str : COCO reference sentences for the particular image to be evaluated
        :returns score: int (ROUGE-L score for the candidate evaluated against references)
        """
        # assert(len(candidate)==1)
        # assert(len(refs)>0)
        prec = []
        rec = []

        # split into tokens
        token_c = candidate[0].split(" ")
    	
        for reference in refs:
            # split into tokens
            token_r = reference.split(" ")
            # compute the longest common subsequence
            lcs = my_lcs(token_r, token_c)
            prec.append(lcs/float(len(token_c)))
            rec.append(lcs/float(len(token_r)))

        prec_max = max(prec)
        rec_max = max(rec)

        if(prec_max!=0 and rec_max !=0):
            score = ((1 + self.beta**2)*prec_max*rec_max)/float(rec_max + self.beta**2*prec_max)
        else:
            score = 0.0
        return score

    def compute_score(self, gts, res):
        """
        Computes Rouge-L score given a set of reference and candidate sentences for the dataset
        Invoked by evaluate_captions.py 
        :param hypo_for_image: dict : candidate / test sentences with "image name" key and "tokenized sentences" as values 
        :param ref_for_image: dict : reference MS-COCO sentences with "image name" key and "tokenized sentences" as values
        :returns: average_score: float (mean ROUGE-L score computed by averaging scores for all the images)
        """
        assert(gts.keys() == res.keys())
        imgIds = gts.keys()

        score = []
        for id in imgIds:
            hypo = res[id]
            ref  = gts[id]

            score.append(self.calc_score(hypo, ref))

            # Sanity check.
            assert(type(hypo) is list)
            assert(len(hypo) == 1)
            assert(type(ref) is list)
            assert(len(ref) > 0)

        average_score = np.mean(np.array(score))
        return average_score, np.array(score)

    def method(self):
        return "Rouge"


In [None]:
def rouge():
    scorer = Rouge()
    score, scores = scorer.compute_score(gts, res)
    print('ROUGE-L = %s' % score)

# CIDEr-D (from MSCOCO evaluation server)

In [None]:
#!/usr/bin/env python
# Tsung-Yi Lin <tl483@cornell.edu>
# Ramakrishna Vedantam <vrama91@vt.edu>

import copy
from collections import defaultdict
import numpy as np
import pdb
import math

def precook_cider(s, n=4, out=False):
    """
    Takes a string as input and returns an object that can be given to
    either cook_refs or cook_test. This is optional: cook_refs and cook_test
    can take string arguments as well.
    :param s: string : sentence to be converted into ngrams
    :param n: int    : number of ngrams for which representation is calculated
    :return: term frequency vector for occuring ngrams
    """
    words = s.split()
    counts = defaultdict(int)
    for k in range(1,n+1):
        for i in range(len(words)-k+1):
            ngram = tuple(words[i:i+k])
            counts[ngram] += 1
    return counts

def cook_refs_cider(refs, n=4): ## lhuang: oracle will call with "average"
    '''Takes a list of reference sentences for a single segment
    and returns an object that encapsulates everything that BLEU
    needs to know about them.
    :param refs: list of string : reference sentences for some image
    :param n: int : number of ngrams for which (ngram) representation is calculated
    :return: result (list of dict)
    '''
    return [precook_cider(ref, n) for ref in refs]

def cook_test_cider(test, n=4):
    '''Takes a test sentence and returns an object that
    encapsulates everything that BLEU needs to know about it.
    :param test: list of string : hypothesis sentence for some image
    :param n: int : number of ngrams for which (ngram) representation is calculated
    :return: result (dict)
    '''
    return precook_cider(test, n, True)

class CiderScorer(object):
    """CIDEr scorer.
    """

    def copy(self):
        ''' copy the refs.'''
        new = CiderScorer(n=self.n)
        new.ctest = copy.copy(self.ctest)
        new.crefs = copy.copy(self.crefs)
        return new

    def __init__(self, test=None, refs=None, n=4, sigma=6.0):
        ''' singular instance '''
        self.n = n
        self.sigma = sigma
        self.crefs = []
        self.ctest = []
        self.document_frequency = defaultdict(float)
        self.cook_append(test, refs)
        self.ref_len = None

    def cook_append(self, test, refs):
        '''called by constructor and __iadd__ to avoid creating new instances.'''

        if refs is not None:
            self.crefs.append(cook_refs_cider(refs))
            if test is not None:
                self.ctest.append(cook_test_cider(test)) ## N.B.: -1
            else:
                self.ctest.append(None) # lens of crefs and ctest have to match

    def size(self):
        assert len(self.crefs) == len(self.ctest), "refs/test mismatch! %d<>%d" % (len(self.crefs), len(self.ctest))
        return len(self.crefs)

    def __iadd__(self, other):
        '''add an instance (e.g., from another sentence).'''

        if type(other) is tuple:
            ## avoid creating new CiderScorer instances
            self.cook_append(other[0], other[1])
        else:
            self.ctest.extend(other.ctest)
            self.crefs.extend(other.crefs)

        return self
    def compute_doc_freq(self):
        '''
        Compute term frequency for reference data.
        This will be used to compute idf (inverse document frequency later)
        The term frequency is stored in the object
        :return: None
        '''
        for refs in self.crefs:
            # refs, k ref captions of one image
            for ngram in set([ngram for ref in refs for (ngram,count) in ref.items()]):
                self.document_frequency[ngram] += 1
            # maxcounts[ngram] = max(maxcounts.get(ngram,0), count)

    def compute_cider(self):
        def counts2vec(cnts):
            """
            Function maps counts of ngram to vector of tfidf weights.
            The function returns vec, an array of dictionary that store mapping of n-gram and tf-idf weights.
            The n-th entry of array denotes length of n-grams.
            :param cnts:
            :return: vec (array of dict), norm (array of float), length (int)
            """
            vec = [defaultdict(float) for _ in range(self.n)]
            length = 0
            norm = [0.0 for _ in range(self.n)]
            for (ngram, term_freq) in cnts.items():
                # give word count 1 if it doesn't appear in reference corpus
                df = np.log(max(1.0, self.document_frequency[ngram]))
                # ngram index
                n = len(ngram)-1
                # tf (term_freq) * idf (precomputed idf) for n-grams
                vec[n][ngram] = float(term_freq)*(self.ref_len - df)
                # compute norm for the vector.  the norm will be used for computing similarity
                norm[n] += pow(vec[n][ngram], 2)

                if n == 1:
                    length += term_freq
            norm = [np.sqrt(n) for n in norm]
            return vec, norm, length

        def sim(vec_hyp, vec_ref, norm_hyp, norm_ref, length_hyp, length_ref):
            '''
            Compute the cosine similarity of two vectors.
            :param vec_hyp: array of dictionary for vector corresponding to hypothesis
            :param vec_ref: array of dictionary for vector corresponding to reference
            :param norm_hyp: array of float for vector corresponding to hypothesis
            :param norm_ref: array of float for vector corresponding to reference
            :param length_hyp: int containing length of hypothesis
            :param length_ref: int containing length of reference
            :return: array of score for each n-grams cosine similarity
            '''
            delta = float(length_hyp - length_ref)
            # measure consine similarity
            val = np.array([0.0 for _ in range(self.n)])
            for n in range(self.n):
                # ngram
                for (ngram,count) in vec_hyp[n].items():
                    # vrama91 : added clipping
                    val[n] += min(vec_hyp[n][ngram], vec_ref[n][ngram]) * vec_ref[n][ngram]

                if (norm_hyp[n] != 0) and (norm_ref[n] != 0):
                    val[n] /= (norm_hyp[n]*norm_ref[n])

                assert(not math.isnan(val[n]))
                # vrama91: added a length based gaussian penalty
                val[n] *= np.e**(-(delta**2)/(2*self.sigma**2))
            return val

        # compute log reference length
        self.ref_len = np.log(float(len(self.crefs)))
        if len(self.crefs) == 1:
            self.ref_len = 1
        scores = []
        for test, refs in zip(self.ctest, self.crefs):
            # compute vector for test captions
            vec, norm, length = counts2vec(test)
            # compute vector for ref captions
            score = np.array([0.0 for _ in range(self.n)])
            for ref in refs:
                vec_ref, norm_ref, length_ref = counts2vec(ref)
                score += sim(vec, vec_ref, norm, norm_ref, length, length_ref)
            # change by vrama91 - mean of ngram scores, instead of sum
            score_avg = np.mean(score)
            # divide by number of references
            score_avg /= len(refs)
            # multiply score by 10
            score_avg *= 10.0
            # append score of an image to the score list
            scores.append(score_avg)
        return scores

    def compute_score(self, option=None, verbose=0):
        # compute idf
        self.compute_doc_freq()
        # assert to check document frequency
        assert(len(self.ctest) >= max(self.document_frequency.values()))
        # compute cider score
        score = self.compute_cider()
        # debug
        # print score
        return np.mean(np.array(score)), np.array(score)

In [None]:
# Filename: cider.py
# Description: Describes the class to compute the CIDEr (Consensus-Based Image Description Evaluation) Metric 
#               by Vedantam, Zitnick, and Parikh (http://arxiv.org/abs/1411.5726)
# Creation Date: Sun Feb  8 14:16:54 2015
# Authors: Ramakrishna Vedantam <vrama91@vt.edu> and Tsung-Yi Lin <tl483@cornell.edu>
#from cider_scorer import CiderScorer
import pdb

class Cider:
    """
    Main Class to compute the CIDEr metric

    """
    def __init__(self, test=None, refs=None, n=4, sigma=6.0):
        # set cider to sum over 1 to 4-grams
        self._n = n
        # set the standard deviation parameter for gaussian penalty
        self._sigma = sigma

    def compute_score(self, gts, res):
        """
        Main function to compute CIDEr score
        :param  hypo_for_image (dict) : dictionary with key <image> and value <tokenized hypothesis / candidate sentence>
                ref_for_image (dict)  : dictionary with key <image> and value <tokenized reference sentence>
        :return: cider (float) : computed CIDEr score for the corpus
        """
        assert(gts.keys() == res.keys())
        imgIds = gts.keys()

        cider_scorer = CiderScorer(n=self._n, sigma=self._sigma)
        for id in imgIds:
            hypo = res[id]
            ref = gts[id]
            # Sanity check.
            assert(type(hypo) is list)
            assert(len(hypo) == 1)
            assert(type(ref) is list)
            assert(len(ref) > 0)
            cider_scorer += (hypo[0], ref)
        (score, scores) = cider_scorer.compute_score()

        return score, scores

    def method(self):
        return "CIDEr"

In [None]:
def cider():
    scorer = Cider()
    (score, scores) = scorer.compute_score(gts, res)
    print('CIDER-D = %s' % score)

#SCORES FOR CAPTIONS GENERATED BY GREEDY SEARCH

In [None]:
import json
with open('/content/gets_g.json', 'r') as file:
    gts = json.load(file)
with open('/content/res_g.json', 'r') as file:
    res = json.load(file)
print('Scores for captions generated by greedy search')
bleu()
rouge()
cider()

#SCORES FOR CAPTIONS GENERATED BY BEAM SEARCH

In [None]:
import json
with open('/content/gets_b.json', 'r') as file:
    gts = json.load(file)
with open('/content/res_b.json', 'r') as file:
    res = json.load(file)
print('Scores for captions generated by beam search')
bleu()
rouge()
cider()

# GENERATE CAPTIONS FOR NEW IMAGES

In [None]:
train_dir = 'Flickr8k_text/Flickr_8k.trainImages.txt'
token_dir = 'Flickr8k_text/Flickr8k.token.txt'
model_dir = 'RESNET101/MODELS/model_gru.h5'
tokenizer = create_tokenizer(train_dir, token_dir, start_end = True, use_all=True)
vocab_size  = tokenizer.num_words or (len(tokenizer.word_index)+1)
max_len = 24 

# Greedy search decoding

In [None]:
# generate captions for test images using greedy search
# the images are to be kept in a directory

par_inference = greedy_model(vocab_size, max_len)
par_inference.load_weights(model_dir, by_name = True, skip_mismatch=True)

def generate_caption_from_directory(file_directory):
    # Encoder
    img_features_dict = feature_extraction(file_directory)
    # Decoder
    captions = decoder_greedy(par_inference, tokenizer, img_features_dict['features'], True)    
    return img_features_dict['ids'], captions

image_dir = 'test_images' #folder containing the test images
img_names, captions = generate_caption_from_directory(image_dir)
for img_file in os.listdir(image_dir):
    img = mpimg.imread(image_dir + '/' + img_file)
    plt.imshow(img)    
    img_name = os.path.splitext(img_file)[0]
    idx = img_names.index(img_name)    
    plt.show()
    print(captions[idx])

# Beam search decoding

In [None]:
# generate captions for test images using beam search
# the images are to be kept in a directory

beamsearching_model = beamsearch_model(vocab_size)
beamsearching_model.load_weights(model_dir, by_name = True, skip_mismatch=True)

def generate_caption_from_directory(file_directory, beam_width = 20):
    # Encoder
    img_features_dict = feature_extraction(file_directory)
    # Decoder
    N = img_features_dict['features'].shape[0]
    a0= np.zeros([N, 512])
    captions = []
    for i in range(N):
        res = beam_searching(beamsearching_model, img_features_dict['features'][i, :].reshape(1,-1), a0[i, :].reshape(1,-1), tokenizer, beam_width, max_len)
        best_idx = np.argmax(res['scores'])
        captions.append(tokenizer.sequences_to_texts([res['routes'][best_idx]])[0])   
    return img_features_dict['ids'], captions

image_dir = 'test_images' #folder containing test images
img_names, captions = generate_caption_from_directory(image_dir, 20)
for img_file in os.listdir(image_dir):
    img = mpimg.imread(image_dir + '/' + img_file)
    plt.imshow(img)   
    img_name = os.path.splitext(img_file)[0]
    idx = img_names.index(img_name)    
    plt.show()
    print(captions[idx])