In [None]:
import re
import random

import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt

from tensorflow import keras
from time import time

from tqdm import tqdm # progress bar
from sklearn.model_selection import train_test_split # Dividing train test
from nltk.translate.bleu_score import corpus_bleu # BLEU Score

In [None]:
# import tensorflow.compat.v1 as tf 

# tf.enable_eager_execution(tf.ConfigProto(log_device_placement=True)) 
# habilita l'execució ansiosa(eager execution) en Tensorflow i mostra el registre d'ubicacio de dispositius.
# l'execicio ansiosa es ina forma d'execucio de gràfics que permet una interaccio mes interactiva amb el model i un fluxe de treball mes similar a la programacio interactiva.
# Els calculs es realitzen inmediatament i els resultat es retornen de inmediat
# el argument aquest tf.ConfigProto(log_device_placement=True) es opcional i s'utilitza per imprimir un registre que indica en quin dispositiu s'executa cada operacio en Tensorflow --> optimitza rendiment i compren millor com sulitza la capacitat de processament del hardware

# print(tf.add([1.0, 2.0], [3.0, 4.0]))

In [None]:
dataset_path = "data" # es el path
dataset_images_path = dataset_path + "/Images/"  # path de imatges

In [None]:
img_height = 180 # altura
img_width = 180 # amplada
validation_split = 0.2 # 80% entrenament 20% validacio

In [None]:
# Remove the last layer of the Inception V3 model
def get_encoder():
    image_model = tf.keras.applications.InceptionV3(include_top=False, weights='imagenet') 
    # crea una instancia de l'arquitectura de red neuronal convolucional preentrenada en el conjunt de dades imagenet
    new_input = image_model.input
    hidden_layer = image_model.layers[-1].output

    image_features_extract_model = tf.keras.Model(new_input, hidden_layer)
    # creacio del model amb el nou input i hidden
    return image_features_extract_model

# Preprocess the caption, splitting the string and adding <start> and <end> tokens
def get_preprocessed_caption(caption):    
    caption = re.sub(r'\s+', ' ', caption) #reemplaza totes les sequencies de un o mes espais en blanc per un sol espai
    caption = caption.strip() #elimina els caracters en blanc
    caption = "<start> " + caption + " <end>" # agrega el start i el end al caption
    return caption

In [None]:
images_captions_dict = {} # creem diccionari

with open(dataset_path + "/captions.txt", "r") as dataset_info: #obrim dataset
    next(dataset_info) # Omit header: image, caption

    # Using a subset of 4,000 entries out of 40,000
    for info_raw in list(dataset_info)[:4000]: # itera a través dels primers 4000 element del dataset
        info = info_raw.split(",") # divideix la cadena en una llista de subcadenes utilitzant la coma com delimitador
        image_filename = info[0] # el nom del fitxer esta a la posicio 1
        caption = get_preprocessed_caption(info[1]) # processem el que hi ha en la posicio 2 de la subcadena

        if image_filename not in images_captions_dict.keys(): # si el nom del fitxer no esta dintre del diccionari
            images_captions_dict[image_filename] = [caption] # l'afegim amb el value processat
        else: # si ja esta en el diccionari
            images_captions_dict[image_filename].append(caption) # afegim el value processat a la key

In [None]:
#Carregar una imatge
def load_image(image_path):
    img = tf.io.read_file(dataset_images_path + image_path)#llegir el fitxer que es troba en aquell path
    img = tf.image.decode_jpeg(img, channels=3) #decodificat una imatge JPEG codificada en una cadena binaria i retorna una representacio de tensor de la imatge decodificada
    img = tf.image.resize(img, (img_height, img_width)) # redimensionem la imatge amb la altura i amplada que volem
    img = tf.keras.applications.inception_v3.preprocess_input(img) # preprocessing needed for pre-trained model
    return img, image_path

In [None]:

image_captions_dict_keys = list(images_captions_dict.keys()) # les imatges que tenim
image_dataset = tf.data.Dataset.from_tensor_slices(image_captions_dict_keys) #crea un objecte de conjunt de TensorFlow a partir de un Tensor d'entrada.
image_dataset = image_dataset.map(load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE).batch(64) #operacio de preprocessament d'un conjunt de dades que carrega i decodifica imatges en paralel i les agrrupa en subatchts de 64.

In [None]:
images_dict = {} #diccionari buit
encoder = get_encoder() # model
for img_tensor, path_tensor in tqdm(image_dataset): #es un bucle d'un conjunt de dades de TensorFlow que conte tensors de imatge i rutes d'arxiu, el tqdm() es una fucnio per proporcionar una barra de progress en temps real mentres s'itera.
    batch_features_tensor = encoder(img_tensor) #retorna la imatge codificada
    
    # Loop over batch to save each element in images_dict
    for batch_features, path in zip(batch_features_tensor, path_tensor):
        decoded_path = path.numpy().decode("utf-8") #tensor de cadena de bytes --> cadena de caracters de Python
        images_dict[decoded_path] = batch_features.numpy() #tensorflow --> array python

In [None]:
list(images_dict.items())[0][1].shape # retorna la forma del tensor de la imatge

In [None]:
plt.imshow(load_image('1000268201_693b08cb0e.jpg')[0].numpy()) #mostra una imatge

In [None]:
# obtenir les labels
def get_images_labels(image_filenames):
    images = [] #llista de imatges
    labels = [] #llista de labels
    
    for image_filename in image_filenames: # per cada imatge
        image = images_dict[image_filename] #agafar el seu value
        captions = images_captions_dict[image_filename] #agafar la seva caption

        # Add one instance per caption
        for caption in captions: #si hi ha mes de una caption afegirles totes
            images.append(image)
            labels.append(caption)
            
    return images, labels

In [None]:
image_filenames = list(images_captions_dict.keys()) # totes les imatges que hi ha a images_caption_dict
image_filenames_train, image_filenames_test = \
    train_test_split(image_filenames, test_size=validation_split, random_state=1) # fer el split entre train i test

X_train, y_train_raw = get_images_labels(image_filenames_train) #agafar les labels de train
X_test, y_test_raw = get_images_labels(image_filenames_test)#agafar les labels de test

In [None]:
# Per image 5 captions and 0.2 test split
len(X_train), len(y_train_raw), len(X_test), len(y_test_raw)

In [None]:

top_k = 5000 # Take maximum of words out of 7600
tokenizer = tf.keras.preprocessing.text.Tokenizer(num_words=top_k,
                                                  oov_token="<unk>",
                                                  filters='!"#$%&()*+.,-/:;=?@[\]^_`{|}~ ') # s'utilitza per vectoritzar text i convertirlo en sequencia de numeros

# Generate vocabulary from train captions
tokenizer.fit_on_texts(y_train_raw)

# Introduce padding to make the captions of the same size for the LSTM model
tokenizer.word_index['<pad>'] = 0
tokenizer.index_word[0] = '<pad>'

# Create the tokenized vectors
y_train = tokenizer.texts_to_sequences(y_train_raw)

# Add padding to each vector to the max_length of the captions (automatically done)
y_train = tf.keras.preprocessing.sequence.pad_sequences(y_train, padding='post')

In [None]:
max_caption_length = max(len(t) for t in y_train) #agafar la longitud maxima de les etiquetes de entrenament
print(max_caption_length)

In [None]:
[tokenizer.index_word[i] for i in y_train[1]] # convertir una etiqueta de una llista en una llista de paraules utilitzant un objecte tokenizer.

In [None]:
dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train))
#crear un objecte tf.data.Dataset a partir de X i Y train

In [None]:
BUFFER_SIZE = len(X_train) #especificar el tamany del bufer
BATCH_SIZE = 64 #especificar el tamany del batch
NUM_STEPS = BUFFER_SIZE // BATCH_SIZE #numero de pasos d'entrenament

# Shuffle and batch
dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

# Using prefetching: https://www.tensorflow.org/guide/data_performance#prefetching
dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

In [None]:
class CNN_Encoder(tf.keras.Model):
    # Since you have already extracted the features and dumped it using pickle
    # This encoder passes those features through a Fully connected layer
    def __init__(self, embedding_dim):
        super(CNN_Encoder, self).__init__()
        # shape after fc == (batch_size, 64, embedding_dim)
        self.flat = tf.keras.layers.Flatten() #aplanar una entrada de dades multidimensionals
        self.fc = tf.keras.layers.Dense(embedding_dim) #, activation='relu') #una capa completament connectada, que s'utilitza per transformar una entrada de una dimensio a una altre

    def call(self, x):
        x = self.flat(x)
        x = self.fc(x)
        return x

In [None]:
class RNN_Decoder(tf.keras.Model):
    def __init__(self, embedding_dim, units, vocab_size):
        super(RNN_Decoder, self).__init__()
        self.units = units

        # input_dim = size of the vocabulary
        # Define the embedding layer to transform the input caption sequence
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)

        # Define the Long Short Term Memory layer to predict the next words in the sequence 
        self.lstm = tf.keras.layers.LSTM(self.units, return_sequences=True, return_state=True)
            
        # Define a dense layer to transform the LSTM output into prediction of the best word
        self.fc = tf.keras.layers.Dense(vocab_size) #, activation='softmax')

    # A function that transforms the input embeddings and passes them to the LSTM layer 
    def call(self, captions, features, omit_features = False, initial_state = None, verbose = False):
        if verbose:
            print("Before embedding")
            print(captions.shape)

        embed = self.embedding(captions) #(batch_size, 1, embedding_dim)

        if verbose:
            print("Embed")
            print(embed.shape)

        features = tf.expand_dims(features, 1) #expandir les dimensions del tensor, 1 mes.
        
        if verbose:
            print("Features")
            print(features.shape)
        
        # Concatenating the image and caption embeddings before providing them to LSTM
        # shape == (batch_size, 1, embedding_dim + hidden_size)
        lstm_input = tf.concat([features, embed], axis=-2) if (omit_features == False) else embed
        
        if verbose:
            print("LSTM input")
            print(lstm_input.shape)

        # Passing the concatenated vector to the LSTM
        output, memory_state, carry_state = self.lstm(lstm_input, initial_state=initial_state)

        if verbose:
            print("LSTM output")
            print(output.shape)

        # Transform LSTM output units to vocab_size
        output = self.fc(output)

        return output, memory_state, carry_state

    def reset_state(self, batch_size):
        return tf.zeros((batch_size, self.units)) # crea un tensor de forma (batch_size, self.units), inicialitzant tot  a 0.

In [None]:
units = embedding_dim = 512 # As in the paper
vocab_size = min(top_k + 1, len(tokenizer.word_index.keys())) #limitar el numero de paraules que s'utilitzen per entrenar el model

# Initialize encoder and decoder
encoder = CNN_Encoder(embedding_dim) #capa de red neuronal convolucional utilitzada per extreure caracteristiques d'una imatge
decoder = RNN_Decoder(embedding_dim, units, vocab_size) #es una classe que defineix una red neuronal recurrent utilitzada per generear subtitul a partir de les cracteristiques de imatges extretes per el encoder

# Initialize optimizer
optimizer = tf.keras.optimizers.Adam()

# As the label is not one-hot encoded but indices. Logits as they are not probabilities.
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')

# Computes the loss using SCCE and calculates the average of singular losses in the tensor
def loss_function(real, pred, verbose=False):
    loss_ = loss_object(real, pred) #calcula la perduda entra la real i la predita
    
    if verbose:
        print("Loss")
        print(loss_)
    
    loss_ = tf.reduce_mean(loss_, axis = 1) #calcula la mitja entre els elements al llarg de un eix especific de un tensor.
        
    if verbose:
        print("After Mean Axis 1")    
        print(loss_)

    return loss_
# Key Point: Any Python side-effects (appending to a list, printing with print, etc) will only happen once, when func is traced. 
# To have side-effects executed into your tf.function they need to be written as TF ops:
@tf.function
def train_step(img_tensor, target, verbose=False):    
    if verbose:
        print("Image tensor")
        print(img_tensor.shape)

        print("Target")
        print(target.shape)    

    # The input would be each set of words without the last one (<end>), to leave space for the first one that
    # would be the image embedding
    dec_input = tf.convert_to_tensor(target[:, :-1])

    # Source: https://www.tensorflow.org/api_docs/python/tf/GradientTape
    with tf.GradientTape() as tape: #tf.GradientTape() calcul automatic de gradients
        features = encoder(img_tensor)
        
        if verbose:
            print("Features CNN")
            print(features)
            
        predictions, _, _ = decoder(dec_input, features, verbose=verbose) #instancia de RNN_Decoder, realitza una inferencia cap endevant       
        
        if verbose:
            print("Predictions RNN")
            print(predictions)
        
        caption_loss = loss_function(target, predictions) # (batch_size, )

        # After tape
        total_batch_loss = tf.reduce_sum(caption_loss) # Sum (batch_size, ) => K
        mean_batch_loss = tf.reduce_mean(caption_loss) # Mean(batch_size, ) => K

    # Updated the variables
    trainable_variables = encoder.trainable_variables + decoder.trainable_variables
    gradients = tape.gradient(caption_loss, trainable_variables) #calcular els gradient de les variables entrenables, a la perduda utilitzant la diferenciacio automatica
    optimizer.apply_gradients(zip(gradients, trainable_variables)) # aplica els gradients calculats a les variables entrenables del model

    return total_batch_loss, mean_batch_loss

In [None]:
checkpoint_path = "./checkpoints/train" #path de chechpoint
ckpt = tf.train.Checkpoint(encoder=encoder,
                           decoder=decoder,
                           optimizer = optimizer) #guardar i carregar els pesos del model
ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=5) #administra punts de control de tensorflow

In [None]:
start_epoch = 0 #la primera epoca es el 0
if ckpt_manager.latest_checkpoint: #agafar el ultim checkpoint
    start_epoch = int(ckpt_manager.latest_checkpoint.split('-')[-1]) #agafa el numero del ultim punt de control guardat.
    # restoring the latest checkpoint in checkpoint_path
    ckpt.restore(ckpt_manager.latest_checkpoint)

In [None]:
loss_plot = []
EPOCHS = 20
start_epoch = 0

for epoch in range(start_epoch, EPOCHS):
    real_epoch = len(loss_plot) + 1
    start = time()
    total_loss = 0

    for (batch, (img_tensor, target)) in enumerate(dataset):
        total_batch_loss, mean_batch_loss = train_step(img_tensor, target, verbose=False) #es una funcio que executa un pas d'entrenament en un batch de imatges
        total_loss += total_batch_loss

        if batch % 100 == 0:
            print ('Epoch {} Batch {} Batch Loss {:.4f}'.format(real_epoch, batch, mean_batch_loss.numpy()))
    
    print ('Total Loss {:.6f}'.format(total_loss))
    epoch_loss = total_loss / NUM_STEPS
    
    # storing the epoch end loss value to plot later
    loss_plot.append(epoch_loss)

    if epoch % 5 == 0:
        ckpt_manager.save()

    print ('Epoch {} Epoch Loss {:.6f}'.format(real_epoch, epoch_loss))
    print ('Time taken for 1 epoch {} sec\n'.format(time() - start))

In [None]:
plt.plot(loss_plot) #ensenya la loss
plt.xlabel('Epochs') #label de x
plt.ylabel('Loss') #label de y
plt.title('Loss Plot') #label de title
plt.show() #mostrarho

In [None]:
def clean_caption(caption):
    return [item for item in caption if item not in ['<start>', '<end>', '<pad>']]

#aquesta funcio rep una llista de paraules que representa una oracio de una imatge i elimina el tokends start,end,pad, 
#que son els que utilitza el model per indica inici i fi de la oracio i la llargada.
# de forma que retorna una oracio.

In [None]:
test_img_name = random.choice(image_filenames_train) # selecciona aleatoriament un elemnt de una llista. En aquest cas agafar una imatge al atzar.

In [None]:
# Get captions from a test image
def get_caption(img):    
    # Add image to an array to simulate batch size of 1    
    features = encoder(tf.expand_dims(img, 0))
    
    caption = []
    dec_input = tf.expand_dims([], 0) #crear un tensor 1D buit i l'exte a 2D
    
     # Inputs the image embedding into the trained LSTM layer and predicts the first word of the sequence.
    # The output, hidden and cell states are passed again to the LSTM to generate the next word.
    # The iteration is repeated until the caption does not reach the max length.
    state = None
    for i in range(1, max_caption_length):
        predictions, memory_state, carry_state = \
            decoder(dec_input, features, omit_features=i > 1, initial_state=state) #s'executa el decder per generar la predicio següent i els estat de memoria i celda.

        # Takes maximum index of predictions
        word_index = np.argmax(predictions.numpy().flatten())

        caption.append(tokenizer.index_word[word_index])

        dec_input = tf.expand_dims([word_index], 0)  #afegeix una dimensio mes a word_index     
        state = [memory_state, carry_state]
    
    # Filter caption
    return clean_caption(caption)

raw_img = load_image(test_img_name)[0] #es una funcio que retorna una imatge en forma de tensor
img = images_dict[test_img_name] #value de test_img_name
captions = images_captions_dict[test_img_name] #value de test_img_name

plt.imshow(raw_img) #mostra la imatge

print("Real captions")
for caption in captions: #mostra totes les captions d'aquella imatge
    print(caption)

print("Esimated caption")
estimated_caption = get_caption(img) #et va mostrant les oracions de cada imatge
print(estimated_caption)