In [6]:
from os import listdir
from pickle import dump, load
import pandas as pd
from sklearn.model_selection import train_test_split
from keras_preprocessing.text import Tokenizer
# from keras.preprocessing.text import Tokenizer
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.preprocessing.sequence import pad_sequences

# from keras.layers import Dropout, Embedding, LSTM, Dense, Input
from tensorflow.keras.layers import Dropout, Embedding, LSTM, Dense, Input, add
from keras.models import Model, Sequential

import numpy as np
import tensorflow as tf
from nltk.translate.bleu_score import corpus_bleu



RANDOM_SEED = 42

In [7]:
import tensorflow as tf
tf.__version__

# import keras
# keras.__version__

'2.7.0'

In [8]:
captions = pd.read_csv('data/flickr_8k/captions.txt')

In [9]:
def clean_description(text):
    '''
    returns new array of tokens representing the text

    - lowercased
    - removes 1 - letter punctuation
    - removes numbers
    - appends 's to previous words
    - reconstructs string

    <start> is appended to the start
    <end> is appended to the end

    Notes:
    maybe keep in numbers
    maybe remove all dashes 
    '''
    output = []

    text = text.lower().replace('"', '')
    
    tokens = text.split()
    for token in tokens:
        if token.isalpha() or ((not token.isalpha() and len(token) > 1) and not token.isnumeric()):
            output.append(token)

    for i, token in enumerate(output):
        if token == "'s":
            output[i-1] = output[i-1] + "'s"
            output.remove("'s")

        if len(token) == 2 and '.' in token:
            output[i] = token.replace('.', '')
    
    output = ['<start>'] + output + ['<end>']

    return output

def clean_descriptions(filename):
    data = pd.read_csv(filename)
    data['caption'] = data['caption'].apply(lambda caption: clean_description(caption))
    return data

In [10]:
cleaned_data = clean_descriptions('data/flickr_8k/captions.txt')
print(cleaned_data)

                           image  \
0      1000268201_693b08cb0e.jpg   
1      1000268201_693b08cb0e.jpg   
2      1000268201_693b08cb0e.jpg   
3      1000268201_693b08cb0e.jpg   
4      1000268201_693b08cb0e.jpg   
...                          ...   
40450   997722733_0cb5439472.jpg   
40451   997722733_0cb5439472.jpg   
40452   997722733_0cb5439472.jpg   
40453   997722733_0cb5439472.jpg   
40454   997722733_0cb5439472.jpg   

                                                 caption  
0      [<start>, a, child, in, a, pink, dress, is, cl...  
1      [<start>, a, girl, going, into, a, wooden, bui...  
2      [<start>, a, little, girl, climbing, into, a, ...  
3      [<start>, a, little, girl, climbing, the, stai...  
4      [<start>, a, little, girl, in, a, pink, dress,...  
...                                                  ...  
40450  [<start>, a, man, in, a, pink, shirt, climbs, ...  
40451  [<start>, a, man, is, rock, climbing, high, in...  
40452  [<start>, a, person, in, a, r

In [11]:
all_filenames = list(set(cleaned_data['image']))
train_filenames, test_filenames = train_test_split(all_filenames, test_size=0.2, random_state=RANDOM_SEED)
test_filenames, validation_filenames = train_test_split(test_filenames, test_size=0.5, random_state=RANDOM_SEED)

training_samples = cleaned_data.loc[cleaned_data['image'].isin(train_filenames)]
validation_samples = cleaned_data.loc[cleaned_data['image'].isin(validation_filenames)]
test_samples = cleaned_data.loc[cleaned_data['image'].isin(test_filenames)]

In [12]:
training_samples

Unnamed: 0,image,caption
0,1000268201_693b08cb0e.jpg,"[<start>, a, child, in, a, pink, dress, is, cl..."
1,1000268201_693b08cb0e.jpg,"[<start>, a, girl, going, into, a, wooden, bui..."
2,1000268201_693b08cb0e.jpg,"[<start>, a, little, girl, climbing, into, a, ..."
3,1000268201_693b08cb0e.jpg,"[<start>, a, little, girl, climbing, the, stai..."
4,1000268201_693b08cb0e.jpg,"[<start>, a, little, girl, in, a, pink, dress,..."
...,...,...
40450,997722733_0cb5439472.jpg,"[<start>, a, man, in, a, pink, shirt, climbs, ..."
40451,997722733_0cb5439472.jpg,"[<start>, a, man, is, rock, climbing, high, in..."
40452,997722733_0cb5439472.jpg,"[<start>, a, person, in, a, red, shirt, climbi..."
40453,997722733_0cb5439472.jpg,"[<start>, a, rock, climber, in, a, red, shirt,..."


In [13]:
tokenizer = Tokenizer()
tokenizer.fit_on_texts(list(training_samples['caption']))
VOCAB_SIZE = len(tokenizer.word_index) + 1
print(VOCAB_SIZE)

8163


In [14]:
def samples_to_dict(samples):
	
	# TODO comment

	descriptions = dict()
	for image, caption in zip(samples['image'], samples['caption']):
		if image not in descriptions.keys():
			descriptions[image] = [caption]
		else:
			descriptions[image].append(caption)	

		
	return descriptions

training_dict = samples_to_dict(training_samples)
validation_dict = samples_to_dict(validation_samples)
test_dict = samples_to_dict(test_samples)

In [15]:
def import_image_features(features_file, corresponding_filenames):
    '''
    from our stored pkl file of extracted VGG features,
    given the pkl file name, and a list of photo filenames
    return a dictionary from filename to VGG features
    '''
    # import all features from pkl file
    all_features = load(open(features_file, 'rb'))
    
    # get a dictionary from filename to image features
    # splits filename at '.' because the pkl file doesnt store the .jpg part of the filename
    features = {filename: all_features[filename.split('.')[0]] for filename in corresponding_filenames}
    
    return features

In [16]:
train_image_features = import_image_features('8k_features.pkl', train_filenames)
test_image_features = import_image_features('8k_features.pkl', test_filenames)
val_image_features = import_image_features('8k_features.pkl', validation_filenames)

In [17]:
MAX_LENGTH = max(training_samples['caption'].apply(lambda caption : len(caption)))

In [18]:
# TODO try W2V embeddings later if model sucks

def dictionary_to_model_samples(dictionary, image_features, max_length):
    # list of image features concatenated with corresponding first i-1 words
    all_Xs = [[] for i in range(max_length)]

    # next word for corresponding sentence
    all_ys = [[] for i in range(max_length)]

    for filename, samples in dictionary.items():
        samples = tokenizer.texts_to_sequences(samples)
        for sample in samples:
            for i in range(len(sample) - 1):

                if i > max_length - 1:
                    break 

                x1 = image_features[filename].reshape(-1,)
                x2 = np.array(sample[:i + 1])

                combined_X = np.concatenate([x1, x2])

                y = to_categorical(sample[i+1], VOCAB_SIZE)
            
                all_Xs[i].append(combined_X)
                all_ys[i].append(y)


    all_Xs = [tf.convert_to_tensor(samples) for samples in all_Xs]
    all_ys = [tf.convert_to_tensor(samples) for samples in all_ys]
    
    return all_Xs, all_ys

In [40]:
# TODO change and comment

def create_sequences(image_features, descriptions, desired_caption_size):
    '''
    given 5 descriptions corresponding to one image, output a list of:
    (image feature vector, first i-1 words in a sequence, ith word) 
    '''
    X = []
    # next word
    y = []

    image_features = image_features.reshape(-1,)
                                
    descriptions = tokenizer.texts_to_sequences(descriptions)
    for description in descriptions:
        caption = np.array(description[:desired_caption_size])
        combined_X = np.concatenate([image_features, caption])
        
        X.append(combined_X)
        y.append(to_categorical(description[:desired_caption_size], VOCAB_SIZE)[0])

    return tf.convert_to_tensor(X), tf.convert_to_tensor(y)


In [33]:
# TODO change and comment

# data generator, intended to be used in a call to model.fit_generator()
def data_generator(filename_description_dictionary, img_features_dict, desired_caption_size, loops):
    while loops >= 1:
        np.random.seed(RANDOM_SEED)

        # shuffle filename order for better distribution over multiple loops (epochs)
        all_filenames = list(filename_description_dictionary.keys())
        np.random.shuffle(all_filenames)
        
        # loop for ever over files
        for filename in all_filenames:
            # get the corresponding descriptions
            descriptions = filename_description_dictionary[filename]

            # retrieve the photo feature
            img_features = img_features_dict[filename][0]

            input_vector, out_word = create_sequences(img_features, descriptions, desired_caption_size)
            yield input_vector, out_word

        loops -= 1

In [44]:
from tensorflow.keras.layers import ReLU

def generate_logistic_model(input_size):

    linear_activation = ReLU(negative_slope=1)

    model = Sequential()

    model.add(Dense(input_size, activation=linear_activation))

    model.add(Dense(VOCAB_SIZE, activation='softmax'))

    model.compile(loss='categorical_crossentropy', optimizer='adam')

    return model

In [23]:
from sklearn.linear_model import LogisticRegression


class LogisticDecoder():
    '''
    '''

    def __init__(self, caption_max_length, solver='liblinear'):
        '''
        '''
        self.max_len = caption_max_length

        # generate a model that takes in an image feature vector, and the caption so far, and outputs the next word
        # each model corresponds to a specefic input caption length
        self.models = [generate_logistic_model(4096 + i + 1) for i in range(caption_max_length)]


    def fit(self, sample_dictionary, image_feature_dictionary, verbose=False):
        samples_x, samples_y = dictionary_to_model_samples(sample_dictionary, image_feature_dictionary, max_length=MAX_LENGTH)

        if verbose:
            print('Samples Generated, beginning training')
        

        for i in range(self.max_len):
            current_x = samples_x[i]
            current_y = samples_y[i]

            if verbose:
                print(f'Training model #{i+1}')

            self.models[i].fit(current_x, current_y)

    def prediction(self, image_feature):
        for i in range(self.max_len):
            

In [24]:
logistic_decoder = LogisticDecoder(1)

In [46]:
test_model = logistic_decoder.models[0]

In [49]:
first_generator = data_generator(validation_dict, val_image_features, 1, loops=1)

In [50]:
test_model.fit_generator(first_generator, epochs=1, verbose=1)

  """Entry point for launching an IPython kernel.




<keras.callbacks.History at 0x1c211bd0ec8>