# Descripción de imágenes con un modelo _Transformer (Tensorflow)_

Este notebook detalla un modelo _Transformer_ para subtitulado/descripción de imágenes.

La arquitectura del modelo es similar a la detallada en el paper [Attention Is All You Need](https://arxiv.org/pdf/1706.03762.pdf). 

Además la implementación se basa en el _Notebook_ [Caption_Transformer.ipynb](https://github.com/tanishqgautam/Image-Captioning/blob/main/Transformer/Caption_Transformer.ipynb)

***DataSet:*** 

Este notebook utiliza el conjunto de datos [MS-COCO](http://cocodataset.org/#home) para el entrenamiento y testeo del modelo.

## 1. Importar librerías

In [None]:
import sys

In [None]:
import tensorflow as tf
from tensorflow import keras

import matplotlib.pyplot as plt

import collections
import random
import string
import numpy as np
from numpy import array
import pandas as pd 
from PIL import Image
import os
import pickle
import time

from tqdm import tqdm

In [None]:
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.utils import plot_model
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input
from tensorflow.keras.layers import Dense, BatchNormalization
from tensorflow.keras.layers import LSTM
from tensorflow.keras.layers import Embedding
from tensorflow.keras.layers import Dropout
from tensorflow.keras.layers import add
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.preprocessing.text import Tokenizer

In [None]:
from sklearn.utils import shuffle

In [None]:
import json
import datetime
from pathlib import Path   
import re

## 2. Preparar entorno y el conjunto de datos _MS COCO_

Previamente, es necesario haber descargado el conjunto de datos _MS COCO_, crear un directorio "ms-coco" y organizar los archivos siguiendo la siguiente estructura;

---
```
ms-coco
  annotations
  images
    train2014
    val2014
```
---

En el siguiente código, se verifica la existencia del contenido del directorio ms-coco. Y con la variable de entorno ***CUDA_VISIBLE_DEVICES*** se especifican las GPU a utilizar.

In [None]:
# [IMPORTANTE]: Configurar CUDA_VISIBLE_DEVICES
# os.environ['CUDA_VISIBLE_DEVICES'] = '0,1'
os.environ["SM_FRAMEWORK"] = "tf.keras"

In [None]:
root_dir = "/".join(os.getcwd().split("/")[0:-1])+"/"
print("INFO: El directorio ráiz de proyecto es:",root_dir)

In [None]:
coco_dir="ms-coco/"
annotation_folder = "annotations/"
image_folder = "images/"

if not os.path.exists(root_dir + coco_dir + annotation_folder) or not os.path.exists(root_dir + coco_dir + image_folder):
    raise Exception('ERR: Faltan archivos..' )

### Cargar _dataset_

In [None]:
with open(root_dir + coco_dir + annotation_folder + f'/captions_train2014.json') as f:
    annotations = json.load(f)

image_path_to_caption = collections.defaultdict(list)
for val in annotations['annotations']:
    caption = val['caption']
    image_path = root_dir +coco_dir + 'images/train2014/' + 'COCO_train2014_' + '%012d.jpg' % (val['image_id'])
    image_path_to_caption[image_path].append(caption)

In [None]:
with open(root_dir + coco_dir + '/annotations' + f'/captions_val2014.json') as f:
    annotations.update(json.load(f))

for val in annotations['annotations']:
    caption = val['caption']
    image_path = root_dir + coco_dir + 'images/val2014/' + 'COCO_val2014_' + '%012d.jpg' % (val['image_id'])
    image_path_to_caption[image_path].append(caption)

### Tamaño del _dataset_

In [None]:
image_paths = list(image_path_to_caption.keys())
random.shuffle(image_paths)
print('INFO: Tamaño de image_paths:',len(image_paths))

In [None]:
all_captions = []
all_img_name_vector = []

for image_path in image_paths:
    caption_list = image_path_to_caption[image_path]
    all_captions.extend(caption_list)
    all_img_name_vector.extend([image_path] * len(caption_list))

In [None]:
data = pd.DataFrame({'index': list(range(0, len(all_img_name_vector))),
                    'filename': all_img_name_vector,
                    'caption': all_captions}
                   )

uni_filenames = np.unique(data.filename.values)
data.head()

In [None]:
npic = 5
npix = 224
target_size = (npix,npix,3)

count = 1
fig = plt.figure(figsize=(10,20))
for jpgfnm in uni_filenames[10:15]:
    filename = jpgfnm
    captions = list(data["caption"].loc[data["filename"]==jpgfnm].values)
    image_load = load_img(filename, target_size=target_size)
    
    ax = fig.add_subplot(npic,2,count,xticks=[],yticks=[])
    ax.imshow(image_load)
    count += 1
    
    ax = fig.add_subplot(npic,2,count)
    plt.axis('off')
    ax.plot()
    ax.set_xlim(0,1)
    ax.set_ylim(0,len(captions))
    for i, caption in enumerate(captions):
        ax.text(0,i,caption,fontsize=20)
    count += 1
plt.show()

## 3. Pre-procesado de las imágenes

Para la extracción de características se utiliza la red _InceptionV3_ (que está preentrenado en _ImageNet_). 

Para lo que es necesario:
- Cambiar el tamaño de la imagen a 299px por 299px.
- Normalizar las imágenes con [preprocess_input](https://www.tensorflow.org/api_docs/python/tf/keras/applications/inception_v3/preprocess_input).

In [None]:
def load_image(image_path):
    img = tf.io.read_file(image_path)
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.image.resize(img, (299, 299))
    img = tf.keras.applications.inception_v3.preprocess_input(img)
    return img, image_path

### Inicializar _InceptionV3_ y cargar los pesos de _ImageNet_ previamente entrenados.

Ahora creará un modelo tf.keras donde la capa de salida es la última capa convolucional _InceptionV3_. Y la forma de la salida de esta capa es 8x8x2048.



In [None]:
image_model = tf.keras.applications._InceptionV3_(include_top=False,
                                                weights='_ImageNet_')
new_input = image_model.input
hidden_layer = image_model.layers[-1].output

image_features_extract_model = tf.keras.Model(new_input, hidden_layer)

In [None]:
encode_train = sorted(set(all_img_name_vector))

image_dataset = tf.data.Dataset.from_tensor_slices(encode_train)
image_dataset = image_dataset.map(
  load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE).batch(16)

if not os.path.exists(all_img_name_vector[0]+'.npy'):
    for img, path in tqdm(image_dataset):
        batch_features = image_features_extract_model(img)
        batch_features = tf.reshape(batch_features,
                              (batch_features.shape[0], -1, batch_features.shape[3]))
        
        for bf, p in zip(batch_features, path):
            path_of_feature = p.numpy().decode("utf-8")
            np.save(path_of_feature, bf.numpy())
        
else:
    print("INFO: Características en:", root_dir + coco_dir + 'images/[val2014|train2014]/')
    

## 4. Pre-procesado de los subtítulos


In [None]:
vocabulary = []
for txt in data.caption.values:
    vocabulary.extend(txt.split())
print('INFO: Tamaño del vocabulario: %d' % len(set(vocabulary)))

In [None]:
all_captions = []

for caption  in data["caption"].astype(str):
    caption = '<start> ' + caption+ ' <end>'
    all_captions.append(caption)
all_captions[:10]

In [None]:
all_img_name_vector = []

for annot in data["filename"]:
    full_image_path = annot
    all_img_name_vector.append(full_image_path)
all_img_name_vector[:10]

In [None]:
print(f"INFO: Tamaño de all_img_name_vector = {len(all_img_name_vector)}")
print(f"INFO: Tamaño de all_captions = {len(all_captions)}")

---

### [OPCIONAL]: Limitar el conjunto de datos 
La función _"data_limiter"_ permite limitar el conjunto de datos, para reducir el tiempo del entrenamiento.

In [None]:
def data_limiter(num,
                 total_captions,
                 all_img_name_vector):
    
    train_captions, img_name_vector = shuffle(
        total_captions,all_img_name_vector,random_state=1)
    
    train_captions = train_captions[:num]
    
    img_name_vector = img_name_vector[:num]
    
    return train_captions,img_name_vector

In [None]:
# [OPCIONAL]
# captions, img_name_vector = data_limiter(40000, 
#                                          all_captions, 
#                                          all_img_name_vector)

# Si no se quiere limitar el conjunto de datos se igualan las 
# variables para facilitar el cambio.
captions = all_captions
img_name_vector = all_img_name_vector

In [None]:
print(f"INFO: Tamaño de img_name_vectorimg_name_vector = {len(img_name_vector)}")
print(f"INFO: Tamaño de captions = {len(captions)}")

---

### Pre-procesado y tokenizado de los subtítulos

Procedimiento:
* Se convierten en tokens los subtítulos.
* Se limita el tamaño del vocabulario a las 5.000 palabras principales y reemplazara todas las demás palabras con el token "UNK" (desconocido).
* Se mapean palabras a índices (word-to-index) e índices a palabras (index-to-word).

In [None]:
# Se eligen las 5000 palabras principales del vocabulario
top_k = 5000

tokenizer = tf.keras.preprocessing.text.Tokenizer(num_words=top_k,
                                                  oov_token="<unk>",
                                                  filters='!"#$%&()*+.,-/:;=?@[\]^_`{|}~ ')
tokenizer.fit_on_texts(captions)

In [None]:
tokenizer.word_index['<pad>'] = 0
tokenizer.index_word[0] = '<pad>'

In [None]:
# Se crea el vector tokenizado
all_seqs = tokenizer.texts_to_sequences(captions)

In [None]:
cap_vector = tf.keras.preprocessing.sequence.pad_sequences(all_seqs, padding='post')

## 5. Split del _dataset_ y crear tf.data del dataset

In [None]:
split_dir=root_dir+"splits/"


In [None]:
def split_file(split):
    return split_dir + f'karpathy_{split}_images.txt'

In [None]:
def read_split_image_ids_and_paths(split):
    split_df = pd.read_csv(split_file(split), sep=' ', header=None)
    dir_aux = root_dir + coco_dir +'images/'+ split_df.iloc[:,0]
    return split_df.iloc[:,1].to_numpy(), dir_aux.to_numpy()

In [None]:
img_to_cap_vector = collections.defaultdict(list)
for img, cap in zip(img_name_vector, cap_vector):
    img_to_cap_vector[img].append(cap)
    
img_name_train_keys = read_split_image_ids_and_paths('train')[1]

img_name_train = []
cap_train = []

for imgt in img_name_train_keys:
    capt_len = len(img_to_cap_vector[imgt])
    
    img_name_train.extend([imgt] * capt_len)
    cap_train.extend(img_to_cap_vector[imgt])

In [None]:
print("INFO: Tamaño del train dataset:", len(img_name_train))

In [None]:
img_name_val_keys = read_split_image_ids_and_paths('valid')[1] 

img_name_val = []
cap_val = []


for imgv in img_name_val_keys:
    capv_len = len(img_to_cap_vector[imgv])
    
    img_name_val.extend([imgv] * capv_len)
    cap_val.extend(img_to_cap_vector[imgv])

In [None]:
print("INFO: Tamaño del val dataset:", len(img_name_val))

In [None]:
img_name_test_keys = read_split_image_ids_and_paths('test')[1]

img_name_test = []

for img_test in img_name_test_keys:
    img_name_test.extend([img_test])

In [None]:
print("INFO: Tamaño del test dataset:", len(img_name_test))

### Create a tf.data dataset for training

In [None]:
BATCH_SIZE = 128
BUFFER_SIZE = 1000

num_steps = len(img_name_train) // BATCH_SIZE

print("INFO: Número de steps:", num_steps)

In [None]:
def map_func(img_name, cap):
    img_tensor = np.load(img_name.decode('utf-8')+'.npy')
    return img_tensor, cap

In [None]:
dataset = tf.data.Dataset.from_tensor_slices((img_name_train, cap_train))

# Se utiliza map para cargar los archivos numpy en paralelo
dataset = dataset.map(lambda item1, item2: tf.numpy_function(
          map_func, [item1, item2], [tf.float32, tf.int32]),
          num_parallel_calls=tf.data.experimental.AUTOTUNE)

dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

## 6. Modelo

### _Positional Encoding_

Se inserta información de la posición (_positional encoding_) relativa o absoluta de los tokens de la secuencia para mantener el orden de dicha secuencia. 
En esta caso, se utiliza para el _positional encoding_ funciones seno y coseno de diferentes frecuencias.

In [None]:
def get_angles(pos, i, d_model):
    
    angle_rates = 1 / np.power(10000, (2 * (i//2)) / np.float32(d_model))
    return pos * angle_rates

In [None]:
def positional_encoding_1d(position, d_model):
    
    angle_rads = get_angles(np.arange(position)[:, np.newaxis],
                            np.arange(d_model)[np.newaxis, :],
                            d_model)
    
    # Función seno para índices pares en el array; 2i
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    
    # Función coseno para índices impares en el array; 2i+1
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
      
    pos_encoding = angle_rads[np.newaxis, ...]
      
    return tf.cast(pos_encoding, dtype=tf.float32)

In [None]:
def positional_encoding_2d(row, col, d_model):
    assert d_model % 2 == 0
    
    row_pos = np.repeat(np.arange(row),col)[:,np.newaxis]
    col_pos = np.repeat(np.expand_dims(np.arange(col),0),row,axis=0).reshape(-1,1)
    angle_rads_row = get_angles(row_pos,np.arange(d_model//2)[np.newaxis,:],d_model//2)
    angle_rads_col = get_angles(col_pos,np.arange(d_model//2)[np.newaxis,:],d_model//2)
    
    angle_rads_row[:, 0::2] = np.sin(angle_rads_row[:, 0::2])
    angle_rads_row[:, 1::2] = np.cos(angle_rads_row[:, 1::2])
    angle_rads_col[:, 0::2] = np.sin(angle_rads_col[:, 0::2])
    angle_rads_col[:, 1::2] = np.cos(angle_rads_col[:, 1::2])
    pos_encoding = np.concatenate([angle_rads_row,angle_rads_col],axis=1)[np.newaxis, ...]

    return tf.cast(pos_encoding, dtype=tf.float32)


### _Multi-Head Attention_

En la capa de _“Multi-Head Attention”_ se realiza una proyección lineal de las consultas (Q), claves (K) y valores (V) de h veces. En las que cada vez se utilizan proyecciones lineales diferentes, adaptadas a las dimensiones de dq, dk y dv. 

Para cada una de estas versiones proyectadas se aplica en paralelo la función de atención _“Scaled Dot-Product Attention”_. 

In [None]:
def create_padding_mask(seq):
    seq = tf.cast(tf.math.equal(seq, 0), tf.float32)
    
    # Agregar dimensiones adicionales para ampliar el padding de la atención
    return seq[:, tf.newaxis, tf.newaxis, :]  # (batch_size, 1, 1, seq_len)

In [None]:
def create_look_ahead_mask(size):
    mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)
    return mask  # (seq_len, seq_len)

In [None]:
def scaled_dot_product_attention(q, k, v, mask):
    """Calculate the attention weights.
    q, k, v must have matching leading dimensions.
    k, v must have matching penultimate dimension, i.e.: seq_len_k = seq_len_v.
    The mask has different shapes depending on its type(padding or look ahead) 
    but it must be broadcastable for addition.
    
    Args:
      q: query shape == (..., seq_len_q, depth)
      k: key shape == (..., seq_len_k, depth)
      v: value shape == (..., seq_len_v, depth_v)
      mask: Float tensor with shape broadcastable 
            to (..., seq_len_q, seq_len_k). Defaults to None.
      
    Returns:
      output, attention_weights
    """

    matmul_qk = tf.matmul(q, k, transpose_b=True)  # (..., seq_len_q, seq_len_k)
    
    dk = tf.cast(tf.shape(k)[-1], tf.float32)
    scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)

    if mask is not None:
        scaled_attention_logits += (mask * -1e9)  #adding -Inf where mask is 1 s.t. value get ignored in softmax

    attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)  # (..., seq_len_q, seq_len_k)

    output = tf.matmul(attention_weights, v)  # (..., seq_len_q, depth_v)

    return output, attention_weights

In [None]:
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model
        
        assert d_model % self.num_heads == 0
        
        self.depth = d_model // self.num_heads
        
        self.wq = tf.keras.layers.Dense(d_model)
        self.wk = tf.keras.layers.Dense(d_model)
        self.wv = tf.keras.layers.Dense(d_model)
        
        self.dense = tf.keras.layers.Dense(d_model)
          
    def split_heads(self, x, batch_size):
        
        """
        Dividir la última dimensión en (num_heads, depth) y
        transponer el resultado de la siguiente manera: (batch_size, num_heads, seq_len, depth)
        """
        
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])
      
    def call(self, v, k, q, mask=None):
        batch_size = tf.shape(q)[0]
        
        q = self.wq(q)  # (batch_size, seq_len, d_model)
        k = self.wk(k)  # (batch_size, seq_len, d_model)
        v = self.wv(v)  # (batch_size, seq_len, d_model)
        
        q = self.split_heads(q, batch_size)  # (batch_size, num_heads, seq_len_q, depth)
        k = self.split_heads(k, batch_size)  # (batch_size, num_heads, seq_len_k, depth)
        v = self.split_heads(v, batch_size)  # (batch_size, num_heads, seq_len_v, depth)
        
        scaled_attention, attention_weights = scaled_dot_product_attention(q, k, v, mask)
        
        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])  # (batch_size, seq_len_q, num_heads, depth)
        concat_attention = tf.reshape(scaled_attention, 
                                      (batch_size, -1, self.d_model))  # (batch_size, seq_len_q, d_model)
        output = self.dense(concat_attention)  # (batch_size, seq_len_q, d_model)
            
        return output, attention_weights

In [None]:
def point_wise_feed_forward_network(d_model, dff):
    return tf.keras.Sequential([
        tf.keras.layers.Dense(dff, activation='relu'),  # (batch_size, seq_len, dff)
        tf.keras.layers.Dense(d_model)  # (batch_size, seq_len, d_model)
  ])

### Capa codificador-decodificador

In [None]:
class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super(EncoderLayer, self).__init__()     
        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ffn = point_wise_feed_forward_network(d_model, dff)     
        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = tf.keras.layers.Dropout(rate)
        self.dropout2 = tf.keras.layers.Dropout(rate)
      
    def call(self, x, training, mask=None):     
        attn_output, _ = self.mha(x, x, x, mask)  
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(x + attn_output)  
        
        ffn_output = self.ffn(out1)  
        ffn_output = self.dropout2(ffn_output, training=training)
        out2 = self.layernorm2(out1 + ffn_output)  
        
        return out2

In [None]:
class DecoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super(DecoderLayer, self).__init__()

        self.mha1 = MultiHeadAttention(d_model, num_heads)
        self.mha2 = MultiHeadAttention(d_model, num_heads)

        self.ffn = point_wise_feed_forward_network(d_model, dff)

        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm3 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        self.dropout1 = tf.keras.layers.Dropout(rate)
        self.dropout2 = tf.keras.layers.Dropout(rate)
        self.dropout3 = tf.keras.layers.Dropout(rate)


    def call(self, x, enc_output, training, 
               look_ahead_mask=None, padding_mask=None):

        # Usar ahead mask para que durante self attention se considere el token futuro.
        attn1, attn_weights_block1 = self.mha1(x, x, x, look_ahead_mask)  
        attn1 = self.dropout1(attn1, training=training)
        out1 = self.layernorm1(attn1 + x)

        # Usar padding mask para evitar valores padding de enc_output y dec_input
        attn2, attn_weights_block2 = self.mha2(
            enc_output, enc_output, out1, padding_mask)  
        attn2 = self.dropout2(attn2, training=training)
        
        out2 = self.layernorm2(attn2 + out1)  
        
        ffn_output = self.ffn(out2)  
        ffn_output = self.dropout3(ffn_output, training=training)
        
        out3 = self.layernorm3(ffn_output + out2)  
        
        return out3, attn_weights_block1, attn_weights_block2

In [None]:
class Encoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, dff,
                   row_size,col_size,rate=0.1):
        super(Encoder, self).__init__()

        self.d_model = d_model
        self.num_layers = num_layers

        self.embedding = tf.keras.layers.Dense(self.d_model,activation='relu')
        self.pos_encoding = positional_encoding_2d(row_size,col_size, 
                                                self.d_model)


        self.enc_layers = [EncoderLayer(d_model, num_heads, dff, rate) 
                           for _ in range(num_layers)]

        self.dropout = tf.keras.layers.Dropout(rate)

    def call(self, x, training, mask=None):

        seq_len = tf.shape(x)[1]

        # Añadir embedding y position encoding.
        x = self.embedding(x)  
        x += self.pos_encoding[:, :seq_len, :]

        x = self.dropout(x, training=training)

        for i in range(self.num_layers):
            x = self.enc_layers[i](x, training, mask)

        return x  

In [None]:
class Decoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, dff, target_vocab_size,
                   maximum_position_encoding, rate=0.1):
        
        super(Decoder, self).__init__()

        self.d_model = d_model
        self.num_layers = num_layers

        self.embedding = tf.keras.layers.Embedding(target_vocab_size, d_model)
        self.pos_encoding = positional_encoding_1d(maximum_position_encoding, d_model)

        self.dec_layers = [DecoderLayer(d_model, num_heads, dff, rate) 
                           for _ in range(num_layers)]
        self.dropout = tf.keras.layers.Dropout(rate)

    def call(self, x, enc_output, training, 
               look_ahead_mask=None, padding_mask=None):

        seq_len = tf.shape(x)[1]
        attention_weights = {}

        x = self.embedding(x)  
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]

        x = self.dropout(x, training=training)

        for i in range(self.num_layers):
            x, block1, block2 = self.dec_layers[i](x, enc_output, training,
                                                 look_ahead_mask, padding_mask)

            attention_weights['decoder_layer{}_block1'.format(i+1)] = block1
            attention_weights['decoder_layer{}_block2'.format(i+1)] = block2

        return x, attention_weights

### _Transformer_

In [None]:
class Transformer(tf.keras.Model):
    def __init__(self, num_layers, d_model, num_heads, dff,row_size,col_size, 
                   target_vocab_size,max_pos_encoding, rate=0.1):
        super(Transformer, self).__init__()

        self.encoder = Encoder(num_layers, d_model, num_heads, dff,row_size,col_size, rate)

        self.decoder = Decoder(num_layers, d_model, num_heads, dff, 
                               target_vocab_size,max_pos_encoding, rate)

        self.final_layer = tf.keras.layers.Dense(target_vocab_size)

    def call(self, inp, tar, training,look_ahead_mask=None, dec_padding_mask=None,enc_padding_mask=None):

        enc_output = self.encoder(inp, training, enc_padding_mask)  

        dec_output, attention_weights = self.decoder(
            tar, enc_output, training, look_ahead_mask, dec_padding_mask)

        final_output = self.final_layer(dec_output)  

        return final_output, attention_weights

### Hiperparámetros del modelo

In [None]:
num_layer = 4
d_model = 512
dff = 2048
num_heads = 8
row_size = 8
col_size = 8
target_vocab_size = top_k + 1
dropout_rate = 0.1

In [None]:
class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
    def __init__(self, d_model, warmup_steps=4000):
        super(CustomSchedule, self).__init__()

        self.d_model = d_model
        self.d_model = tf.cast(self.d_model, tf.float32)

        self.warmup_steps = warmup_steps

    def __call__(self, step):
        arg1 = tf.math.rsqrt(step)
        arg2 = step * (self.warmup_steps ** -1.5)

        return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)

In [None]:
learning_rate = CustomSchedule(d_model)

optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.98, 
                                     epsilon=1e-9)

In [None]:
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')

In [None]:
def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss_ = loss_object(real, pred)

    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask

    return tf.reduce_sum(loss_)/tf.reduce_sum(mask)

In [None]:
train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
    name='train_accuracy')

In [None]:
transformer = Transformer(num_layer,d_model,num_heads,dff,row_size,col_size,target_vocab_size,max_pos_encoding=target_vocab_size,rate=dropout_rate)

In [None]:
def create_masks_decoder(tar):
    look_ahead_mask = create_look_ahead_mask(tf.shape(tar)[1])
    dec_target_padding_mask = create_padding_mask(tar)
    combined_mask = tf.maximum(dec_target_padding_mask, look_ahead_mask)
    return combined_mask

## 7. Training

In [None]:
loss_plot = []

In [None]:
def train_step(img_tensor, tar):
    tar_inp = tar[:, :-1]
    tar_real = tar[:, 1:]
  
    dec_mask = create_masks_decoder(tar_inp)
    
    with tf.GradientTape() as tape:
        predictions, _ = transformer(img_tensor, tar_inp, 
                                   True,  
                                   dec_mask)
        loss = loss_function(tar_real, predictions)
        
    gradients = tape.gradient(loss, transformer.trainable_variables)    
    
    optimizer.apply_gradients(zip(gradients, transformer.trainable_variables))
    
    train_loss(loss)
    train_accuracy(tar_real, predictions)

In [None]:
for epoch in tqdm(range(20)):
    
    start = time.time()
  
    train_loss.reset_states()
    train_accuracy.reset_states()
  
    for (batch, (img_tensor, tar)) in enumerate(dataset):
        train_step(img_tensor, tar)
    
        if batch % 50 == 0:
            print ('Epoch {} Batch {} Loss {:.4f} Accuracy {:.4f}'.format(
              epoch + 1, batch, train_loss.result(), train_accuracy.result()))
   
    print ('Epoch {} Loss {:.4f} Accuracy {:.4f}'.format(epoch + 1, 
                                                train_loss.result(), 
                                                train_accuracy.result()))

    print ('Time taken for 1 epoch: {} secs\n'.format(time.time() - start))

In [None]:
date=str(datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))

In [None]:
transformer.save_weights('model/image_caption_transformer'+date+'.h5')

## 8. Generar descripción 

In [None]:
def generate(image):

    temp_input = tf.expand_dims(load_image(image)[0], 0)
    img_tensor_val = image_features_extract_model(temp_input)
    img_tensor_val = tf.reshape(img_tensor_val, (img_tensor_val.shape[0], -1, img_tensor_val.shape[3]))

    start_token = tokenizer.word_index['<start>']
    end_token = tokenizer.word_index['<end>']

    # Se selecciona como entrada del decodificador el start_token
    decoder_input = [start_token]
    output = tf.expand_dims(decoder_input, 0) #tokens
    result = [] # lista de palabras

    for i in range(100):
        dec_mask = create_masks_decoder(output)

        predictions, attention_weights = transformer(img_tensor_val,output,False,dec_mask)

        predictions = predictions[: ,-1:, :]  
        
        predicted_id = tf.cast(tf.argmax(predictions, axis=-1), tf.int32)
        
        if predicted_id == end_token:
            return result,tf.squeeze(output, axis=0), attention_weights

        result.append(tokenizer.index_word[int(predicted_id)])
        output = tf.concat([output, predicted_id], axis=-1)

    return result,tf.squeeze(output, axis=0), attention_weights

### Ejemplos de imágenes con la descripción generada

In [None]:
start_token = tokenizer.word_index['<start>']
end_token = tokenizer.word_index['<end>']
# Seleccionar una imágen aleatoria del conjunto de validación.
rid = np.random.randint(0, len(img_name_val))
image = img_name_val[rid]
real_caption = [tokenizer.index_word[i] for i in cap_val[rid] if i not in [0]]
caption,result,attention_weights = generate(image)

# Eliminar "<unk>" 
for i in caption:
    if i=="<unk>":
        caption.remove(i)

for i in real_caption:
    if i=="<unk>":
        real_caption.remove(i)
                            
real_caption = ' '.join(real_caption)
first = real_caption.split(' ', 1)[1]
real_caption = first.rsplit(' ', 1)[0]
        
print ('Descripción de referencia:', real_caption)
print ('Descripción resultante:', ' '.join(word for word in caption[:-1]))
temp_image = np.array(Image.open(image))
plt.imshow(temp_image)
plt.axis('off')


In [None]:
start_token = tokenizer.word_index['<start>']
end_token = tokenizer.word_index['<end>']
# select random image from validation data
rid = np.random.randint(0, len(img_name_val))
image = img_name_val[rid]
real_caption = [tokenizer.index_word[i] for i in cap_val[rid] if i not in [0]]
caption,result,attention_weights = generate(image)

# Eliminar "<unk>"
for i in caption:
    if i=="<unk>":
        caption.remove(i)

for i in real_caption:
    if i=="<unk>":
        real_caption.remove(i)

real_caption = ' '.join(real_caption)
first = real_caption.split(' ', 1)[1]
real_caption = first.rsplit(' ', 1)[0]
        
print ('Descripción de referencia:', real_caption)
print ('Descripción resultante:', ' '.join(word for word in caption[:-1]))
temp_image = np.array(Image.open(image))
plt.imshow(temp_image)
plt.axis('off')


In [None]:
start_token = tokenizer.word_index['<start>']
end_token = tokenizer.word_index['<end>']
# select random image from validation data
rid = np.random.randint(0, len(img_name_val))
image = img_name_val[rid]
real_caption = [tokenizer.index_word[i] for i in cap_val[rid] if i not in [0]]
caption,result,attention_weights = generate(image)

#remove "<unk>" in result
for i in caption:
    if i=="<unk>":
        caption.remove(i)

for i in real_caption:
    if i=="<unk>":
        real_caption.remove(i)

real_caption = ' '.join(real_caption)
first = real_caption.split(' ', 1)[1]
real_caption = first.rsplit(' ', 1)[0]
        
print ('Descripción de referencia:', real_caption)
print ('Descripción resultante:', ' '.join(word for word in caption[:-1]))
temp_image = np.array(Image.open(image))
plt.imshow(temp_image)
plt.axis('off')


In [None]:
start_token = tokenizer.word_index['<start>']
end_token = tokenizer.word_index['<end>']
# Seleccionar una imagen aleatoria del conjunto de validación
rid = np.random.randint(0, len(img_name_val))
image = img_name_val[rid]
real_caption = [tokenizer.index_word[i] for i in cap_val[rid] if i not in [0]]
caption,result,attention_weights = generate(image)

# Eliminar "<unk>"
for i in caption:
    if i=="<unk>":
        caption.remove(i)

for i in real_caption:
    if i=="<unk>":
        real_caption.remove(i)

real_caption = ' '.join(real_caption)
first = real_caption.split(' ', 1)[1]
real_caption = first.rsplit(' ', 1)[0]
        
print ('Descripción de referencia:', real_caption)
print ('Descripción resultante:', ' '.join(word for word in caption[:-1]))
temp_image = np.array(Image.open(image))
plt.imshow(temp_image)
plt.axis('off')


In [None]:
def f_create_json(img_name, split_val ):
    
    
    list_pred = []
    list_true= []
    
    idx = 0

    for image in tqdm(img_name):
        dict_pred = {}
        dict_true = {}

        regex_expression = r'(?P<prefix>COCO_(train|val)2014_)(?P<number>[0-9]+)'
        regex_expression = re.compile(regex_expression)
        img_id = int(regex_expression.match(Path(image).stem).group('number'))  
        caption_list, _, _  = generate(image)
        
        for i in caption_list:
            if i=="<unk>":
                caption_list.remove(i)
                
        dict_pred['image_id' ] = img_id
        dict_pred['caption' ] = ' '.join(word for word in caption_list[:-1])
        
        if (split_val == True):
            dict_true['image_id' ] = img_id
            dict_true['caption' ] = ' '.join([tokenizer.index_word[i] for i in cap_val[idx] if i not in [0]])
            list_true.append(dict_true)

        list_pred.append(dict_pred)
      

        idx+=1
        
    full_file_name = 'output/transformer-tf-'+date
    
    with open(full_file_name+'-predictions.json', 'w') as f:
        json.dump(list_pred, f)
    
    print('Archivo con las predicciones:', full_file_name+'-predictions.json')
    
    if (split_val == True):
        with open(full_file_name+'-true.json', 'w') as f:
            json.dump(list_true, f)
            print('Archivo con las referencias:',full_file_name+'-true.json')

### Test Dataset

In [None]:
# Crear json con las descripciones del conjunto de test dataset
f_create_json(img_name_test, split_val = False)