原始连接：https://tensorflow.google.cn/tutorials/text/image_captioning      
参考论文：https://arxiv.org/pdf/1502.03044.pdf

In [None]:
import tensorflow as tf
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle

In [None]:
import re
import numpy as np
import os
import time
import json
import pickle
from glob import glob
from PIL import Image

# MS-COCO 数据集
82000 张图片，每张图片至少有5个标注说明

In [None]:
annotation_folder = "../datasets/annotations/"
if not os.path.exists(os.path.abspath('.') + annotation_folder):
    annotation_zip = tf.keras.utils.get_file(
        'captions.zip',
        cache_subdir=os.path.abspath("."),
        origin=
        'http://images.cocodataset.org/annotations/annotations_trainval2014.zip',
        extract=True)
    annotation_file = os.path.dirname(
        annotation_zip) + '../datasets/annotations/captions_train2014.json'
    os.remove(annotation_zip)

In [None]:
image_folder = '/train2014/'
if not os.path.exists(os.path.abspath(".") + image_folder):
    image_zip = tf.keras.utils.get_file('train2014.zip',
                                        cache_subdir=os.path.abspath("."),
                                        extract=True)
    PATH = os.path.dirname(image_zip) + image_folder
    os.remove(image_zip)
else:
    PATH = os.path.abspath(".") + image_folder

In [None]:
with open(annotation_file, 'r') as f:
    annotations = json.load(f)

>标注的文本进行预处理，添加 `<start>,<end>` 标签；一张图片路径对应一个标注序列

In [None]:
all_captions = []  # 标注序列
all_image_name_vector = []  # 图片路径
for annot in annotations['annotations']:
    caption = "<start>" + annot["caption"] + "<end>"
    image_id = annot["image_id"]
    full_coco_image_path = PATH + "COCO_train2014" + "%012d.jpg" % (image_id)
    all_image_name_vector.append(full_coco_image_path)
    all_captions.append(caption)

In [None]:
train_captions, img_name_vector = shuffle(all_captions,
                                          all_image_name_vector,
                                          random_state=1)

In [None]:
num_examples = 30000
train_captions = train_captions[:num_examples]  # 训练标签
img_name_vector = img_name_vector[:num_examples]  # 训练数据

# InceptionV3 模型预处理图片

In [None]:
def load_image(image_path):
    img = tf.io.read_file(image_path)
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.image.resize(img, (299, 299))
    img = tf.keras.applications.inception_v3.preprocess_input(img)
    return img, image_path

> - 形状变为 299px * 299 px
- inception_v3 的 preprocess_input 方法，标准化像素数值到 -1 ~ 1 之间

# 初始化InceptionV3模型

In [None]:
image_model = tf.keras.applications.InceptionV3(include_top=False,
                                                weights='imagenet')
new_input = image_model.input
hidden_layer = image_model.layers[-1].output
image_features_extract_model = tf.keras.Model(new_input, hidden_layer)

>只利用 InceptionV3 的特征提取层

# InceptionV3提取图片特征，并保存

In [None]:
# 图片数据管道
encode_train = sorted(set(img_name_vector))
image_dataset = tf.data.Dataset.from_tensor_slices(encode_train)
image_dataset = image_dataset.map(
    load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE).batch(16)

In [None]:
from tqdm import tqdm

# 以批为单位，提取特征，并保存
for img, path in tqdm(image_dataset):
    batch_features = image_features_extract_model(img)
    batch_features = tf.reshape(
        batch_features, (batch_features.shape[0], -1, batch_features.shape[3]))
    for bf, p in zip(batch_features, path):
        path_of_feature = p.numpy().decode('utf-8')
        np.save(path_of_feature, bf.numpy())

# 预处理图片标注并向量化

In [None]:
# 计算图片标注的最大长度
def calc_max_length(tensor):
    return max(len(t) for t in tensor)

In [None]:
# 选择词典中的前 5000 个单词
top_k = 5000
tokenizer = tf.keras.preprocessing.text.Tokenizer(
    num_words=top_k,
    oov_token='<unk>',
    filters='!"$%&()*+.,-/:;=?@[\]^_`{|}~ ')

tokenizer.fit_on_texts(train_captions)

# 文本向量化
train_seqs = tokenizer.texts_to_sequences(train_captions)

In [None]:
# 填充标记
tokenizer.word_index['<pad>'] = 0
tokenizer.index_word[0] = '<pad>'

In [None]:
# 填充成相同的长度
cap_vector = tf.keras.preprocessing.sequence.pad_sequences(train_seqs,
                                                           padding='post')


In [None]:
# 最长长度，用来保存 注意力权重
max_length = calc_max_length(train_seqs)

# 拆分数据集

In [None]:
img_name_train, img_name_val, cap_train, cap_val = train_test_split(
    img_name_vector, cap_vector, test_size=0.2, random_state=0)

# 创建数据管道

In [None]:
BATCH_SIZE = 64
BUFFER_SIZE = 1000
embedding_size = 256
units = 512
vocab_size = top_k + 1
num_steps = len(img_name_train) // BATCH_SIZE
features_shape = 2048
attention_features_shape = 64

In [None]:
def map_func(img_name, cap):
    img_tensor = np.load(img_name.decode('utf-8') + '.npy')
    return img_tensor, cap

In [None]:
dataset = tf.data.Dataset.from_tensor_slices(img_name_train, cap_train)

In [None]:
dataset = dataset.map(lambda item1, item2: tf.numpy_function(
    map_func, [item1, item2], [tf.float32, tf.int32],
    num_parallel_calls=tf.data.experimental.AUTOTUNE))

dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

# 模型
- 通过 InceptionV3 的低层卷积层提取特征，获得张量形状 (8,8,2048)
- 现状改变为 （64，2048）
- 上一步的张量传给 CNN编码器（单一的全连接层），
- RNN 解码器对图片进行注意力计算，预测标签    
<img src="../images/img_caption.PNG" width="80%">

In [None]:
class BahdanauAttention(tf.keras.Model):
    def __init__(self, units):
        super(BahdanauAttention, self).__init__()
        self.W1 = tf.keras.layers.Dense(units)
        self.W2 = tf.keras.layers.Dense(units)
        self.V = tf.keras.layers.Dense(1)

    def call(self, features, hidden):
        # features shape == (batch_size, 64, embedding_dim)
        # hidden shape == (batch_size, hidden_size)
        # hidden_with_time_axis shape == (batch_size, 1, hidden_size)
        
        hidden_with_time_axis = tf.expand_dims(hidden, 1)
        
        # score shape == (batch_size, 64, hidden_size)
        score = tf.nn.tanh(self.W1(features) + self.W2(hidden_with_time_axis))
        
        # attention_weights shape == (batch_size, 64, 1)
        attention_weights = tf.nn.softmax(self.V(score), axis=1)
        
        # context_vector shape after sum == (batch_size, hidden_size)
        context_vector = attention_weights * features
        context_vector = tf.reduce_sum(context_vector, axis=1)
        return context_vector, attention_weights

In [None]:
class CNN_Encoder(tf.keras.Model):
    def __init__(self, embedding_dim):
        super(CNN_Encoder, self).__init__()
        self.fc = tf.keras.layers.Dense(embedding_dim)

    def call(self, x):
        x = self.fc(x)
        # shape  == (batch_size, 64, embedding_dim)
        
        x = tf.nn.relu(x)
        return x

In [None]:
class RNN_Decoder(tf.keras.Model):
    def __init__(self, embedding_dim, units, vocab_size):
        super(RNN_Decoder, self).__init__()
        self.units = units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(self.units,
                                       return_state=True,
                                       return_sequences=True,
                                       recurrent_initializer='glorot_uniform')
        self.fc1 = tf.keras.layers.Dense(self.units)
        self.fc2 = tf.keras.layers.Dense(vocab_size)
        self.attention = BahdanauAttention(self.units)

    def call(self, x, features, hidden):
        # defining attention as a separate model
        context_vector, attention_weights = self.attention(features, hidden)

        # x shape after passing through embedding == (batch_size, 1, embedding_dim)
        x = self.embedding(x)

        # x shape after concatenation == (batch_size, 1, embedding_dim + hidden_size)
        x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)

        # passing the concatenated vector to the GRU
        output, state = self.gru(x)

        # shape == (batch_size, max_length, hidden_size)
        x = self.fc1(output)

        # x shape == (batch_size * max_length, hidden_size)
        x = tf.reshape(x, (-1, x.shape[2]))

        # output shape == (batch_size * max_length, vocab)
        x = self.fc2(x)

        return x, state, attention_weights

    def reset_states(self, batch_size):
        return tf.zeros((batch_size, self.units))

In [None]:
encoder = CNN_Encoder(embedding_size)
decoder = RNN_Decoder(embedding_size, units, vocab_size)

In [None]:
optimizer = tf.keras.optimizers.Adam()
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True,
                                                            reduction='none')

In [None]:
def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss_ = loss_object(real, pred)
    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask
    return tf.reduce_mean(loss_)

# Checkpoint

In [None]:
checkpoint_path = "../models/image_caption/checkpoint/train"
ckpt = tf.train.Checkpoint(encoder=encoder,
                           decoder=decoder,
                           optimizer=optimizer)
ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=5)

In [None]:
start_epoch = 0
if ckpt_manager.latest_checkpoint:
    start_epoch = int(ckpt_manager.latest_checkpoint.split('-')[-1])
    ckpt.restore(ckpt_manager.latest_checkpoint)

# 训练模型

In [None]:
loss_plot = []

In [None]:
@tf.function
def train_step(img_tensor, target):
    loss = 0
    hidden = decoder.reset_states(batch_size=target.shape[0])
    dec_input = tf.expand_dims([tokenizer.word_index['<start>']] *
                               target.shape[0], 1)

    with tf.GradientTape() as tape:
        features = encoder(img_tensor)

        for i in range(1, target.shape[1]):
            predictions, hidden, _ = decoder(dec_input, features, hidden)
            loss += loss_function(target[:, i], predictions)
            dec_input = tf.expand_dims(target[:, i], 1)

        train_loss = loss / int(target.shape[1])
        trainable_variables = encoder.trainable_variables + decoder.trainable_variables
        gradients = tape.gradient(loss, trainable_variables)
        optimizer.apply_gradients(zip(gradients, trainable_variables))
        return loss, train_loss

In [None]:
EPOCHS = 20

In [None]:
for epoch in range(start_epoch, EPOCHS):
    start = time.time()
    total_loss = 0

    for (batch, (img_tensor, target)) in enumerate(dataset):
        batch_loss, t_loss = train_step(img_tensor, target)
        total_loss += t_loss

        if batch % 100 == 0:
            print("Epoch {} Batch {} Loss {:.4f}".format(
                epoch + 1, batch,
                batch_loss.numpy() / int(target.shape[1])))
        loss_plot.append(total_loss / num_steps)

        if epoch % 5 == 0:
            ckpt_manager.save()

        print("Epoch {} Loss {:.6f}".format(epoch + 1, total_loss / num_steps))
        print("Time taken for 1 epoch {} sec\n".format(time.time() - start))

In [None]:
plt.plot(loss_plot)
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.title("Loss Plot")
plt.show()

# 验证模型

In [None]:
def evaluate(image):
    attention_plot = np.zeros((max_length, attention_features_shape))
    hidden = decoder.reset_states(batch_size=1)
    temp_input = tf.expand_dims(load_image(image)[0], 0)
    image_tensor_val = image_features_extract_model(temp_input)
    image_tensor_val = tf.reshape(
        image_tensor_val,
        (image_tensor_val.shape[0], -1, image_tensor_val.shape[3]))
    features = encoder(image_tensor_val)
    dec_input = tf.expand_dims([tokenizer.word_index['<start>']], 0)
    result = []

    for i in range(max_length):
        predictions, hidden, attention_weights = decoder(
            dec_input, features, hidden)
        attention_plot[i] = tf.reshape(attention_weights, (-1, )).numpy()
        predicted_id = tf.random.categorical(predictions, 1)[0][0].numpy()
        result.append(tokenizer.index_word[predicted_id])

        if tokenizer.index_word[predicted_id] == '<end>':
            return result, attention_plot

        dec_input = tf.expand_dims([predicted_id], 0)

    attention_plot = attention_plot[:len(result), :]
    return result, attention_plot

In [None]:
def plot_attention(image, result, attention_plot):
    temp_image = np.array(Image.open(image))
    fig = plt.figure(figsize=(10, 10))
    len_result = len(result)

    for l in range(len_result):
        temp_att = np.resize(attention_plot[l], (8, 8))
        ax = fig.add_subplot(len_result // 2, len_result // 2, l + 1)
        ax.set_title(result[l])
        img = ax.imshow(temp_image)
        ax.imshow(temp_att, cmap='gray', alpha=0.6, extent=img.get_extent())
    plt.tight_layout()
    plt.show()

In [None]:
rid = np.random.randint(0, len(img_name_val))
image = img_name_val[rid]
real_caption = ' '.join([tokenizer.index_word[i] for i in cap_val[rid] if i not in [0]])
result, attention_plot = evaluate(image)

print ('Real Caption:', real_caption)
print ('Prediction Caption:', ' '.join(result))
plot_attention(image, result, attention_plot)


In [None]:
image_url = 'https://tensorflow.org/images/surf.jpg'
image_extension = image_url[-4:]
image_path = tf.keras.utils.get_file('image'+image_extension,
                                     origin=image_url)

result, attention_plot = evaluate(image_path)
print ('Prediction Caption:', ' '.join(result))
plot_attention(image_path, result, attention_plot)
# opening the image
Image.open(image_path)