**DATA PREPARATION**

In [1]:
import os
from pickle import load, dump
from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.applications.vgg16 import preprocess_input
from tensorflow.keras.models import Model
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical, plot_model
import matplotlib.pyplot as plt
from keras.models import Model
from keras.layers import Input, Dense, LSTM, Embedding, Dropout
from keras.layers.merge import add
import string
import numpy as np


def extract_features(directory):
    model = VGG16()
    model = Model(inputs=model.inputs, outputs=model.layers[-2].output)
    print(model.summary())

    features = {}
    for name in os.listdir(directory):
        filename = directory + '/' + name
        image = load_img(filename, target_size=(224, 224))
        image = img_to_array(image)
        image = image.reshape((1, image.shape[0], image.shape[1], image.shape[2]))
        image = preprocess_input(image)
        feature = model.predict(image, verbose=0)
        image_id = name.split('.')[0]
        features[image_id] = feature
        # print('>', name)
    return features


dataset_img = "DATASETS\\Flickr8k_Dataset\\Flicker8k_Dataset(TEST)"
features = extract_features(dataset_img)
print('Extracted features: ', len(features))
dump(features, open('features(NEW).p', 'wb'))

Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 224, 224, 3)]     0         
                                                                 
 block1_conv1 (Conv2D)       (None, 224, 224, 64)      1792      
                                                                 
 block1_conv2 (Conv2D)       (None, 224, 224, 64)      36928     
                                                                 
 block1_pool (MaxPooling2D)  (None, 112, 112, 64)      0         
                                                                 
 block2_conv1 (Conv2D)       (None, 112, 112, 128)     73856     
                                                                 
 block2_conv2 (Conv2D)       (None, 112, 112, 128)     147584    
                                                                 
 block2_pool (MaxPooling2D)  (None, 56, 56, 128)       0     

In [2]:
def load_doc(filename):
    file = open(filename, 'r')
    text = file.read()
    file.close()
    return text

dataset_txt = 'DATASETS\\Flickr8k_text\\'
txt_token = dataset_txt + 'Flickr8k.token(TEST).txt'
doc = load_doc(txt_token)
# print(doc.split('\n'))

In [3]:
def load_descriptions(doc):
    mapping = {}
    for line in doc.split('\n'):
        tokens = line.split('.')
        if len(tokens) < 2:
            continue
        image_id, image_desc = tokens[0], tokens[1:]
        image_desc = ''.join(image_desc)
        if image_id not in mapping:
            mapping[image_id] = []
        mapping[image_id].append(image_desc)
    return mapping

descriptions = load_descriptions(doc)
print('Loaded: ', len(descriptions))
# print(descriptions)

Loaded:  6


In [4]:
def clean_descriptions(descriptions):
    table = str.maketrans('', '', string.punctuation)
    for img, cap in descriptions.items():
        for num_part, caption_part in enumerate(cap):

            caption_part.replace('-', ' ')
            desc = caption_part.split()

            desc = [word.lower() for word in desc]
            desc = [word.translate(table) for word in desc]
            desc = [word for word in desc if(len(word) > 1)]
            desc = [word for word in desc if(word.isalpha())]

            caption_part = ' '.join(desc)
            descriptions[img][num_part] = caption_part

    return descriptions

descriptions = clean_descriptions(descriptions)
# print(descriptions)

In [5]:
def to_vocabulary(descriptions):
    all_desc = set()
    for value in descriptions.values():
        [all_desc.update(d.split()) for d in value]
    return all_desc

vocab = to_vocabulary(descriptions)
# print(vocab)
print('Vocabulary size:', len(vocab))

Vocabulary size: 118


In [6]:
def save_descriptions(descriptions, filename):
    lines = []
    for image_id, image_desc in descriptions.items():
        for desc in image_desc:
            lines.append(image_id + '\t' + desc)
    data = '\n'.join(lines)
    file = open(filename, 'w')
    file.write(data)
    file.close()
    
save_descriptions(descriptions, 'descriptions(NEW).txt')

**DEVELOPING DEEP LEARNING MODEL**

**Loading Data**

In [7]:
def load_set(filename):
    doc = load_doc(filename)
    dataset = []
    for line in doc.split('\n'):
        if len(line) < 1:
            continue
        identifier = line.split('.')[0]
        dataset.append(identifier)
    return set(dataset)

trainImages = dataset_txt + '/Flickr_8k.trainImages(TEST).txt'
train = load_set(trainImages)
print('Training images loaded:', len(train))
# print(train)

Training images loaded: 6


In [8]:
def load_clean_descriptions(filename, dataset):
    doc = load_doc(filename)
    descriptions = {}
    for line in doc.split('\n'):
        tokens = line.split('\t')
        image_id, image_desc = tokens[0], tokens[1:]
        if image_id in dataset:
            if image_id not in descriptions:
                descriptions[image_id] = []
            desc = 'startseq ' + ' '.join(image_desc) + ' endseq'
            descriptions[image_id].append(desc)
    return descriptions

train_descriptions = load_clean_descriptions('descriptions(NEW).txt', train)
print("Training descriptions loaded:", len(train_descriptions))
# print(train_descriptions)

Training descriptions loaded: 6


In [9]:
def load_photo_features(filename, dataset):
    all_features = load(open(filename, 'rb'))
    features = {k: all_features[k] for k in dataset}
    return features

train_features = load_photo_features('features(NEW).p', train)
print('Features loaded:', len(train_features.keys()))
# for i, feature in enumerate(features.values()):
#     print(f'@{i} - {feature}')

Features loaded: 6


In [10]:
def dict_to_list(descriptions):
    all_desc = []
    for values in descriptions.values():
        [all_desc.append(x) for x in values]
    return all_desc

def create_tokenizer(descriptions):
    lines = dict_to_list(descriptions)
    tokenizer = Tokenizer()
    tokenizer.fit_on_texts(lines)
    return tokenizer

tokenizer = create_tokenizer(train_descriptions)
dump(tokenizer, open('tokenizer(NEW).p', 'wb'))
vocab_size = len(tokenizer.word_index) + 1
print('Vocabulary size:', vocab_size)

Vocabulary size: 121


In [11]:
def create_sequences(tokenizer, max_length, desc_list, photo, vocab_size):
    x1, x2, y = [], [], []
    for desc in desc_list:
        seq = tokenizer.texts_to_sequences([desc])[0]
        for i in range(1, len(seq)):
            input_seq, output_seq = seq[:i], seq[i]
            input_seq = pad_sequences([input_seq], maxlen=max_length)[0]
            output_seq = to_categorical([output_seq], num_classes=vocab_size)[0]
            x1.append(photo)
            x2.append(input_seq)
            y.append(output_seq)
    return np.array(x1), np.array(x2), np.array(y)

def max_length(descriptions):
    lines = dict_to_list(descriptions)
    return max(len(d.split()) for d in lines)

max_length = max_length(train_descriptions)
print('Maximum sentence length:', max_length)

Maximum sentence length: 18


**Defining the Model**

In [12]:
def define_model(vocab_size, max_length):
    # Извлечение особенностей
    inputs1 = Input(shape=(4096,))
    fe1 = Dropout(0.5)(inputs1)
    fe2 = Dense(256, activation='relu')(fe1)
    # Процессор последовательностей
    inputs2 = Input(shape=(max_length,))
    se1 = Embedding(vocab_size, 256, mask_zero=True)(inputs2)
    se2 = Dropout(0.5)(se1)
    se3 = LSTM(256)(se2)
    # Декодер
    decoder1 = add([fe2, se3])
    decoder2 = Dense(256, activation='relu')(decoder1)
    outputs = Dense(vocab_size, activation='softmax')(decoder2)
    # Связывание всего в единое
    model = Model(inputs=[inputs1, inputs2], outputs=outputs)
    model.compile(loss='categorical_crossentropy', optimizer='adam')
    # Подведение итогов
    print(model.summary())
    # plot_model(model, to_file='models(NEW)/model.png', show_shapes=True)
    return model

**Fitting the Model**

In [13]:
def data_generator(descriptions, photos, tokenizer, max_length, vocab_size):
    while 1:
        for key, desc_list in descriptions.items():
            photo = photos[key][0]
            in_img, in_seq, out_word = create_sequences(tokenizer, max_length, desc_list, photo, vocab_size)
            yield [in_img, in_seq], out_word

generator = data_generator(train_descriptions, train_features, tokenizer, max_length, vocab_size)
inputs, outputs = next(generator)
print(inputs[0].shape)
print(inputs[1].shape)
print(outputs.shape)


(66, 4096)
(66, 18)
(66, 121)


In [14]:
model = define_model(vocab_size, max_length)
epochs = 75
steps = len(train_descriptions)
for i in range(epochs):
    generator =  data_generator(train_descriptions, train_features, tokenizer, max_length, vocab_size)
    model.fit_generator(generator, epochs=1, steps_per_epoch=steps, verbose=1)
    
    model.save('models(NEW)/model_' + str(i+1) + '.h5')

Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_3 (InputLayer)           [(None, 18)]         0           []                               
                                                                                                  
 input_2 (InputLayer)           [(None, 4096)]       0           []                               
                                                                                                  
 embedding (Embedding)          (None, 18, 256)      30976       ['input_3[0][0]']                
                                                                                                  
 dropout (Dropout)              (None, 4096)         0           ['input_2[0][0]']                
                                                                                            

  model.fit_generator(generator, epochs=1, steps_per_epoch=steps, verbose=1)




In [None]:
from nltk.translate.bleu_score import corpus_bleu
from tensorflow.keras.models import load_model


def word_for_id(integer, tokenizer):
    for word, index in tokenizer.word_index.items():
        if index == integer:
            return word
    return None

def generate_desc(model, tokenizer, photo, max_length):
    input_text  = 'startseq'
    for i in range(max_length):
        seq = tokenizer.texts_to_sequences([input_text])[0]
        seq = pad_sequences([seq], maxlen=max_length)
        yhat = model.predict([photo, seq], verbose=0)
        yhat = np.argmax(yhat)
        word = word_for_id(yhat, tokenizer)
        if word is None:
            break
        input_text += ' ' + word
        if word == 'endseq':
            break
    return input_text

# dump(tokenizer, open('tokenizer(NEW).p', 'wb'))

In [14]:
from tensorflow.keras.models import load_model
from pickle import load


def evaluate_model(model, descriptions, photos, tokenizer, max_length):
    actual, predicted = [], []
    for key, desc_list in descriptions.items():
        yhat = generate_desc(model, tokenizer, photos[key], max_length)
        references = [d.split() for d in desc_list]
        actual.append(references)
        predicted.append(yhat.split())
    print('BLEU-1:', corpus_bleu(actual, predicted, weights=(1.0, 0, 0, 0)))
    print('BLEU-2:', corpus_bleu(actual, predicted, weights=(0.5, 0.5, 0, 0)))
    print('BLEU-3:', corpus_bleu(actual, predicted, weights=(0.3, 0.3, 0.3, 0)))
    print('BLEU-4:', corpus_bleu(actual, predicted, weights=(0.25, 0.25, 0.25, 0.25)))
    
testImages = 'DATASETS\\Flickr8k_text\\Flickr_8k.devImages.txt'
test = load_set(testImages)
print('Dataset:', len(test))
test_descriptions = load_clean_descriptions('descriptions(NEW).txt', test)
print('Descriptions: test=', len(test_descriptions))
test_features = load_photo_features('features(NEW).p', test)
print('Photos: test=', len(test_features))
tokenizer = load(open('tokenizer(NEW).p', 'rb'))
model = load_model('models/model_9.h5')
max_length = 34

evaluate_model(model, test_descriptions, test_features, tokenizer, max_length)

Dataset: 1000
Descriptions: test= 1000
Photos: test= 1000


ValueError: in user code:

    File "D:\Anaconda\lib\site-packages\keras\engine\training.py", line 1801, in predict_function  *
        return step_function(self, iterator)
    File "D:\Anaconda\lib\site-packages\keras\engine\training.py", line 1790, in step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    File "D:\Anaconda\lib\site-packages\keras\engine\training.py", line 1783, in run_step  **
        outputs = model.predict_step(data)
    File "D:\Anaconda\lib\site-packages\keras\engine\training.py", line 1751, in predict_step
        return self(x, training=False)
    File "D:\Anaconda\lib\site-packages\keras\utils\traceback_utils.py", line 67, in error_handler
        raise e.with_traceback(filtered_tb) from None
    File "D:\Anaconda\lib\site-packages\keras\engine\input_spec.py", line 264, in assert_input_compatibility
        raise ValueError(f'Input {input_index} of layer "{layer_name}" is '

    ValueError: Input 0 of layer "model_1" is incompatible with the layer: expected shape=(None, 2048), found shape=(None, 4096)


**Полная версия**

In [3]:
from os import listdir
from pickle import load, dump
from tensorflow.keras.applications.vgg16 import VGG16, preprocess_input
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical, plot_model
from keras.layers import Input, Dense, LSTM, Embedding, Dropout
from keras.layers.merge import add
from keras.callbacks import ModelCheckpoint
from nltk.translate.bleu_score import corpus_bleu
import string
import numpy as np


def extract_features(directory):
    model = VGG16()
    model = Model(inputs=model.inputs, outputs=model.layers[-2].output)
    print(model.summary())

    features = {}
    for name in listdir(directory):
        filename = directory + '/' + name
        image = load_img(filename, target_size=(224, 224))
        image = img_to_array(image)
        image = image.reshape((1, image.shape[0], image.shape[1], image.shape[2]))
        image = preprocess_input(image)
        feature = model.predict(image, verbose=0)
        image_id = name.split('.')[0]
        features[image_id] = feature
        # print('>', name)
    return features

dataset_img = "Flickr8k_Dataset\\Flickr8k_Dataset"
features = extract_features(dataset_img)
print('Extracted features: ', len(features))
dump(features, open('features(NEW).p', 'wb'))

def load_doc(filename):
    file = open(filename, 'r')
    text = file.read()
    file.close()
    return text

def load_descriptions(doc):
    mapping = {}
    for line in doc.split('\n'):
        tokens = line.split('.')
        if len(tokens) < 2:
            continue
        image_id, image_desc = tokens[0], tokens[1:]
        image_desc = ''.join(image_desc)
        if image_id not in mapping:
            mapping[image_id] = []
        mapping[image_id].append(image_desc)
    return mapping

def clean_descriptions(descriptions):
    table = str.maketrans('', '', string.punctuation)
    for img, cap in descriptions.items():
        for num_part, caption_part in enumerate(cap):

            caption_part.replace('-', ' ')
            desc = caption_part.split()

            desc = [word.lower() for word in desc]
            desc = [word.translate(table) for word in desc]
            desc = [word for word in desc if(len(word) > 1)]
            desc = [word for word in desc if(word.isalpha())]

            caption_part = ' '.join(desc)
            descriptions[img][num_part] = caption_part

    return descriptions

def to_vocabulary(descriptions):
    all_desc = set()
    for value in descriptions.values():
        [all_desc.update(d.split()) for d in value]
    return all_desc

def save_descriptions(descriptions, filename):
    lines = []
    for image_id, image_desc in descriptions.items():
        for desc in image_desc:
            lines.append(image_id + '\t' + desc)
    data = '\n'.join(lines)
    file = open(filename, 'w')
    file.write(data)
    file.close()
    
dataset_txt = 'Flickr8k_text//'
txt_token = dataset_txt + 'Flickr8k.token.txt'
doc = load_doc(txt_token)
# print(doc.split('\n'))

descriptions = load_descriptions(doc)
print('Loaded: ', len(descriptions))
# print(descriptions)

descriptions = clean_descriptions(descriptions)
# print(descriptions)

vocab = to_vocabulary(descriptions)
# print(vocab)
print('Vocabulary size:', len(vocab))

save_descriptions(descriptions, 'descriptions(NEW).txt')

def load_set(filename):
    doc = load_doc(filename)
    dataset = []
    for line in doc.split('\n'):
        if len(line) < 1:
            continue
        identifier = line.split('.')[0]
        dataset.append(identifier)
    return set(dataset)

def load_clean_descriptions(filename, dataset):
    doc = load_doc(filename)
    descriptions = {}
    for line in doc.split('\n'):
        tokens = line.split('\t')
        image_id, image_desc = tokens[0], tokens[1:]
        if image_id in dataset:
            if image_id not in descriptions:
                descriptions[image_id] = []
            desc = 'startseq ' + ' '.join(image_desc) + ' endseq'
            descriptions[image_id].append(desc)
    return descriptions

def load_photo_features(filename, dataset):
    all_features = load(open(filename, 'rb'))
    features = {k: all_features[k] for k in dataset}
    return features

def dict_to_list(descriptions):
    all_desc = []
    for values in descriptions.values():
        [all_desc.append(x) for x in values]
    return all_desc

def create_tokenizer(descriptions):
    lines = dict_to_list(descriptions)
    tokenizer = Tokenizer()
    tokenizer.fit_on_texts(lines)
    return tokenizer

def create_sequences(tokenizer, max_length, desc_list, photo, vocab_size):
    x1, x2, y = [], [], []
    for desc in desc_list:
        seq = tokenizer.texts_to_sequences([desc])[0]
        for i in range(1, len(seq)):
            input_seq, output_seq = seq[:i], seq[i]
            input_seq = pad_sequences([input_seq], maxlen=max_length)[0]
            output_seq = to_categorical([output_seq], num_classes=vocab_size)[0]
            x1.append(photo)
            x2.append(input_seq)
            y.append(output_seq)
    return np.array(x1), np.array(x2), np.array(y)

def max_length(descriptions):
    lines = dict_to_list(descriptions)
    return max(len(d.split()) for d in lines)

trainImages = dataset_txt + '/Flickr_8k.trainImages.txt'
train = load_set(trainImages)
print('Training images loaded:', len(train))
# print(train)

train_descriptions = load_clean_descriptions('descriptions(NEW).txt', train)
print("Training descriptions loaded:", len(train_descriptions))
# print(train_descriptions)

train_features = load_photo_features('features(NEW).p', train)
print('Features loaded:', len(train_features.keys()))

tokenizer = create_tokenizer(train_descriptions)
vocab_size = len(tokenizer.word_index) + 1
print('Vocabulary size:', vocab_size)

max_length = max_length(train_descriptions)
print('Maximum sentence length:', max_length)

def define_model(vocab_size, max_length):
    # Извлечение особенностей
    inputs1 = Input(shape=(4096,))
    fe1 = Dropout(0.5)(inputs1)
    fe2 = Dense(256, activation='relu')(fe1)
    # Процессор последовательностей
    inputs2 = Input(shape=(max_length,))
    se1 = Embedding(vocab_size, 256, mask_zero=True)(inputs2)
    se2 = Dropout(0.5)(se1)
    se3 = LSTM(256)(se2)
    # Декодер
    decoder1 = add([fe2, se3])
    decoder2 = Dense(256, activation='relu')(decoder1)
    outputs = Dense(vocab_size, activation='softmax')(decoder2)
    # Связывание всего в единое
    model = Model(inputs=[inputs1, inputs2], outputs=outputs)
    model.compile(loss='categorical_crossentropy', optimizer='adam')
    # Подведение итогов
    print(model.summary())
    plot_model(model, to_file='models/model(NEW).png', show_shapes=True)
    return model

def data_generator(descriptions, photos, tokenizer, max_length, vocab_size):
    while 1:
        for key, desc_list in descriptions.items():
            photo = photos[key][0]
            in_img, in_seq, out_word = create_sequences(tokenizer, max_length, desc_list, photo, vocab_size)
            yield [in_img, in_seq], out_word
            
# generator = data_generator(train_descriptions, train_features, tokenizer, max_length, vocab_size)
# inputs, outputs = next(generator)
# print(inputs[0].shape)
# print(inputs[1].shape)
# print(outputs.shape)

model = define_model(vocab_size, max_length)
epochs = 20
steps = len(train_descriptions)
for i in range(epochs):
    generator =  data_generator(train_descriptions, train_features, tokenizer, max_length, vocab_size)
    model.fit_generator(generator, epochs=1, steps_per_epoch=steps, verbose=1)
    model.save('models(NEW)/model_' + str(i+1) + '.h5')

def word_for_id(integer, tokenizer):
    for word, index in tokenizer.word_index.items():
        if index == integer:
            return word
    return None

def generate_desc(model, tokenizer, photo, max_length):
    input_text  = 'startseq'
    for i in range(max_length):
        seq = tokenizer.texts_to_sequences([input_text])[0]
        seq = pad_sequences([seq], maxlen=max_length)
        yhat = model.predict([photo, seq], verbose=0)
        yhat = np.argmax(yhat)
        word = word_for_id(yhat, tokenizer)
        if word is None:
            break
        input_text += ' ' + word
        if word == 'endseq':
            break
    return input_text

def evaluate_model(model, descriptions, photos, tokenizer, max_length):
    actual, predicted = [], []
    for key, desc_list in descriptions.items():
        yhat = generate_desc(model, tokenizer, photos[key], max_length)
        references = [d.split() for d in desc_list]
        actual.append(references)
        predicted.append(yhat.split())
    print('BLEU-1:', corpus_bleu(actual, predicted, weights=(1.0, 0, 0, 0)))
    print('BLEU-2:', corpus_bleu(actual, predicted, weights=(0.5, 0.5, 0, 0)))
    print('BLEU-3:', corpus_bleu(actual, predicted, weights=(0.3, 0.3, 0.3, 0)))
    print('BLEU-4:', corpus_bleu(actual, predicted, weights=(0.25, 0.25, 0.25, 0.25)))
    
testImages = 'Flickr8k_text/Flickr_8k.devImages.txt'
test = load_set(testImages)
print('Dataset:', len(test))

test_descriptions = load_clean_descriptions('descriptions(NEW).txt', test)
print('Descriptions: test=', len(test_descriptions))

test_features = load_photo_features('features(NEW).p', test)
print('Photos: test=', len(test_features))

model = load_model('models(NEW)/model_1.h5')
evaluate_model(model, test_descriptions, test_features, tokenizer, max_length)

Model: "model_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_4 (InputLayer)        [(None, 224, 224, 3)]     0         
                                                                 
 block1_conv1 (Conv2D)       (None, 224, 224, 64)      1792      
                                                                 
 block1_conv2 (Conv2D)       (None, 224, 224, 64)      36928     
                                                                 
 block1_pool (MaxPooling2D)  (None, 112, 112, 64)      0         
                                                                 
 block2_conv1 (Conv2D)       (None, 112, 112, 128)     73856     
                                                                 
 block2_conv2 (Conv2D)       (None, 112, 112, 128)     147584    
                                                                 
 block2_pool (MaxPooling2D)  (None, 56, 56, 128)       0   

  model.fit_generator(generator, epochs=1, steps_per_epoch=steps, verbose=1)




KeyboardInterrupt: 