In [None]:
import joblib
import os
import tensorflow as tf
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Input, Embedding, GRU, Concatenate, BatchNormalization, Dropout
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
import pandas as pd
import numpy as np
import cv2
from nltk.translate.bleu_score import sentence_bleu

# Montar a drive
from google.colab import drive
drive.mount('/content/drive')

# Caminho para os pesos pré-treinados do CheXNet
chexnet_weights = "/content/drive/MyDrive/Hackathons/MedAIVision/Model/brucechou1983_CheXNet_Keras_0.3.0_weights.h5"

# Função para criar o CheXNet
def create_chexnet(chexnet_weights=chexnet_weights, input_size=(224, 224)):
    model = tf.keras.applications.DenseNet121(include_top=False, input_shape=input_size + (3,))
    x = model.output
    x = GlobalAveragePooling2D()(x)
    x = Dense(14, activation="sigmoid", name="chexnet_output")(x)
    chexnet = tf.keras.Model(inputs=model.input, outputs=x)
    chexnet.load_weights(chexnet_weights)
    chexnet = tf.keras.Model(inputs=model.input, outputs=chexnet.layers[-3].output)
    return chexnet

# Camada do Codificador de Imagens
class Image_encoder(tf.keras.layers.Layer):
    def __init__(self, name="image_encoder_block"):
        super().__init__()
        self.chexnet = create_chexnet(input_size=(224, 224))
        self.chexnet.trainable = False
        self.avgpool = tf.keras.layers.AveragePooling2D()

    def call(self, data):
        op = self.chexnet(data)
        op = self.avgpool(op)
        op = tf.reshape(op, shape=(-1, op.shape[1] * op.shape[2], op.shape[3]))
        return op

# Função do Codificador
def encoder(image1, image2, dense_dim, dropout_rate):
    im_encoder = Image_encoder()
    bkfeat1 = im_encoder(image1)
    bkfeat2 = im_encoder(image2)
    bk_dense = Dense(dense_dim, name='bkdense', activation='relu')
    bkfeat1 = bk_dense(bkfeat1)
    bkfeat2 = bk_dense(bkfeat2)
    concat = Concatenate(axis=1)([bkfeat1, bkfeat2])
    bn = BatchNormalization(name="encoder_batch_norm")(concat)
    dropout = Dropout(dropout_rate, name="encoder_dropout")(bn)
    return dropout

# Camada de Atenção Global
class Global_Attention(tf.keras.layers.Layer):
    def __init__(self, dense_dim):
        super().__init__()
        self.W1 = Dense(units=dense_dim)
        self.W2 = Dense(units=dense_dim)
        self.V = Dense(units=1)

    def call(self, encoder_output, decoder_h):
        decoder_h = tf.expand_dims(decoder_h, axis=1)
        tanh_input = self.W1(encoder_output) + self.W2(decoder_h)
        tanh_output = tf.nn.tanh(tanh_input)
        attention_weights = tf.nn.softmax(self.V(tanh_output), axis=1)
        op = attention_weights * encoder_output
        context_vector = tf.reduce_sum(op, axis=1)
        return context_vector, attention_weights

# Camada de Decodificador de Um Passo
class One_Step_Decoder(tf.keras.layers.Layer):
    def __init__(self, vocab_size, embedding_dim, max_pad, dense_dim, name="onestepdecoder"):
        super().__init__()
        self.dense_dim = dense_dim
        self.embedding = Embedding(input_dim=vocab_size + 1, output_dim=embedding_dim, input_length=max_pad, mask_zero=True, name='onestepdecoder_embedding')
        self.LSTM = GRU(units=self.dense_dim, return_state=True, name='onestepdecoder_LSTM')
        self.attention = Global_Attention(dense_dim=dense_dim)
        self.concat = Concatenate(axis=-1)
        self.dense = Dense(dense_dim, name='onestepdecoder_embedding_dense', activation='relu')
        self.final = Dense(vocab_size + 1, activation='softmax')

    @tf.function
    def call(self, input_to_decoder, encoder_output, decoder_h):
        embedding_op = self.embedding(input_to_decoder)
        context_vector, attention_weights = self.attention(encoder_output, decoder_h)
        context_vector_time_axis = tf.expand_dims(context_vector, axis=1)
        concat_input = self.concat([context_vector_time_axis, embedding_op])
        output, decoder_h = self.LSTM(concat_input, initial_state=decoder_h)
        output = self.final(output)
        return output, decoder_h, attention_weights

# Classe do Decodificador Completo
class Decoder(tf.keras.Model):
    def __init__(self, max_pad, embedding_dim, dense_dim, batch_size, vocab_size):
        super().__init__()
        self.onestepdecoder = One_Step_Decoder(vocab_size=vocab_size, embedding_dim=embedding_dim, max_pad=max_pad, dense_dim=dense_dim)
        self.output_array = tf.TensorArray(tf.float32, size=max_pad)
        self.max_pad = max_pad
        self.batch_size = batch_size
        self.dense_dim = dense_dim

    @tf.function
    def call(self, encoder_output, caption):
        decoder_h = tf.zeros_like(encoder_output[:, 0])
        output_array = tf.TensorArray(tf.float32, size=self.max_pad)
        for timestep in range(self.max_pad):
            output, decoder_h, attention_weights = self.onestepdecoder(caption[:, timestep:timestep + 1], encoder_output, decoder_h)
            output_array = output_array.write(timestep, output)
        output_array = tf.transpose(output_array.stack(), [1, 0, 2])
        return output_array

# Função para Criar o Modelo
def create_model():
    input_size = (224, 224)
    tokenizer = joblib.load('tokenizer.pkl')
    max_pad = 29
    batch_size = 100
    vocab_size = len(tokenizer.word_index)
    embedding_dim = 300
    dense_dim = 512
    dropout_rate = 0.2

    tf.keras.backend.clear_session()
    image1 = Input(shape=input_size + (3,))
    image2 = Input(shape=input_size + (3,))
    caption = Input(shape=(max_pad,))

    encoder_output = encoder(image1, image2, dense_dim, dropout_rate)
    output = Decoder(max_pad, embedding_dim, dense_dim, batch_size, vocab_size)(encoder_output, caption)
    model = tf.keras.Model(inputs=[image1, image2, caption], outputs=output)
    model.load_weights('Encoder_Decoder_global_attention.h5')
    return model, tokenizer

# Função de Predição com Busca por Feixe
def beam_search_predict(image1, image2, model, tokenizer, input_size=(224, 224), beam_width=3):
    image1 = tf.expand_dims(cv2.resize(image1, input_size, interpolation=cv2.INTER_NEAREST), axis=0)
    image2 = tf.expand_dims(cv2.resize(image2, input_size, interpolation=cv2.INTER_NEAREST), axis=0)
    image1 = model.get_layer('image_encoder')(image1)
    image2 = model.get_layer('image_encoder')(image2)
    image1 = model.get_layer('bkdense')(image1)
    image2 = model.get_layer('bkdense')(image2)
    concat = model.get_layer('concatenate')([image1, image2])
    enc_op = model.get_layer('encoder_batch_norm')(concat)
    enc_op = model.get_layer('encoder_dropout')(enc_op)

    initial_h = tf.zeros_like(enc_op[:, 0])
    start_id = tokenizer.texts_to_sequences(['<cls>'])[0][0]
    end_id = tokenizer.texts_to_sequences(['<end>'])[0][0]
    max_pad = 29

    beam = [([start_id], initial_h, 0.0)]  # (sequência, estado oculto, log_prob)
    complete_sequences = []

    for step in range(max_pad):
        new_beam = []
        for seq, h, log_prob in beam:
            if seq[-1] == end_id:
                complete_sequences.append((seq, log_prob))
                continue
            input_token = np.array([[seq[-1]]])
            output, new_h, _ = model.get_layer('decoder').onestepdecoder(input_token, enc_op, h)
            probs = tf.nn.softmax(output[0]).numpy()
            top_k_indices = np.argsort(probs)[-beam_width:]
            top_k_log_probs = np.log(probs[top_k_indices] + 1e-10)
            for i in range(beam_width):
                word_id = top_k_indices[i]
                word_log_prob = top_k_log_probs[i]
                new_seq = seq + [word_id]
                new_log_prob = log_prob + word_log_prob
                if word_id == end_id:
                    complete_sequences.append((new_seq, new_log_prob))
                else:
                    new_beam.append((new_seq, new_h, new_log_prob))
        if not new_beam:
            break
        new_beam = sorted(new_beam, key=lambda x: x[2], reverse=True)[:beam_width]
        beam = new_beam

    all_sequences = complete_sequences + [(seq, log_prob) for seq, _, log_prob in beam]
    if not all_sequences:
        return "<cls>"
    best_seq, _ = max(all_sequences, key=lambda x: x[1])
    caption = tokenizer.sequences_to_texts([best_seq])[0]
    words = caption.split()
    if words[0] == '<cls>':
        words = words[1:]
    if '<end>' in words:
        words = words[:words.index('<end>')]
    return ' '.join(words)

# Função de Predição Simples
def predict1(image1, image2=None, model_tokenizer=None):
    if image2 is None:
        image2 = image1
    image1 = cv2.imread(image1, cv2.IMREAD_UNCHANGED)
    image2 = cv2.imread(image2, cv2.IMREAD_UNCHANGED)
    if image1 is None or image2 is None:
        print("Deve ser uma imagem")
        return None
    if len(image1.shape) == 2:
        image1 = cv2.merge([image1, image1, image1])
    if len(image2.shape) == 2:
        image2 = cv2.merge([image2, image2, image2])
    image1 = tf.keras.applications.densenet.preprocess_input(image1)
    image2 = tf.keras.applications.densenet.preprocess_input(image2)
    if model_tokenizer is None:
        model, tokenizer = create_model()
    else:
        model, tokenizer = model_tokenizer[0], model_tokenizer[1]
    predicted_caption = beam_search_predict(image1, image2, model, tokenizer, beam_width=3)
    return predicted_caption

# Função de Predição com Avaliação BLEU
def predict2(true_caption, image1, image2=None, model_tokenizer=None):
    if image2 is None:
        image2 = image1
    image1 = cv2.imread(image1, cv2.IMREAD_UNCHANGED)
    image2 = cv2.imread(image2, cv2.IMREAD_UNCHANGED)
    if image1 is None or image2 is None:
        print("Deve ser uma imagem")
        return None
    if len(image1.shape) == 2:
        image1 = cv2.merge([image1, image1, image1])
    if len(image2.shape) == 2:
        image2 = cv2.merge([image2, image2, image2])
    image1 = tf.keras.applications.densenet.preprocess_input(image1)
    image2 = tf.keras.applications.densenet.preprocess_input(image2)
    if model_tokenizer is None:
        model, tokenizer = create_model()
    else:
        model, tokenizer = model_tokenizer[0], model_tokenizer[1]
    predicted_caption = beam_search_predict(image1, image2, model, tokenizer, beam_width=3)
    reference = [true_caption.split()]
    prediction = predicted_caption.split()
    bleu1 = sentence_bleu(reference, prediction, weights=(1, 0, 0, 0))
    bleu2 = sentence_bleu(reference, prediction, weights=(0.5, 0.5, 0, 0))
    bleu3 = sentence_bleu(reference, prediction, weights=(0.33, 0.33, 0.33, 0))
    bleu4 = sentence_bleu(reference, prediction, weights=(0.25, 0.25, 0.25, 0.25))
    return pd.DataFrame([[bleu1, bleu2, bleu3, bleu4]], columns=['bleu1', 'bleu2', 'bleu3', 'bleu4'])

# Função para Predição em Lote
def function1(image1_list, image2_list, model_tokenizer=None):
    if model_tokenizer is None:
        model_tokenizer = list(create_model())
    predicted_captions = []
    for i1, i2 in zip(image1_list, image2_list):
        caption = predict1(i1, i2, model_tokenizer)
        predicted_captions.append(caption)
    return predicted_captions

# Função para Predição em Lote com Avaliação BLEU
def function2(true_caption_list, image1_list, image2_list):
    model_tokenizer = list(create_model())
    predicted = pd.DataFrame(columns=['bleu1', 'bleu2', 'bleu3', 'bleu4'])
    for c, i1, i2 in zip(true_caption_list, image1_list, image2_list):
        caption = predict2(c, i1, i2, model_tokenizer)
        predicted = predicted.append(caption, ignore_index=True)
    return predicted

In [None]:
image1_list = ["img1.jpg", "img2.jpg"]
image2_list = ["img3.jpg", "img4.jpg"]
captions = function1(image1_list, image2_list)
print(captions)