In [None]:
import os
import re
import cv2
import pickle
import time
import tarfile
import datetime
import warnings
import numpy as np
import pandas as pd
import seaborn as sns
import tensorflow as tf
from tensorflow import keras
from tensorflow import concat, repeat
from collections import Counter
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from sklearn.utils import shuffle
from skimage.transform import resize
import nltk.translate.bleu_score as bleu
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.backend import expand_dims 
from nltk.translate.bleu_score import sentence_bleu
from tensorflow.keras.layers import TimeDistributed, concatenate, Concatenate, Input, Softmax, RNN, Dense, Embedding, LSTM, Layer, Dropout, GRU
from tensorflow.keras.applications import DenseNet121
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.applications.densenet import preprocess_input
from google.colab.patches import cv2_imshow

warnings.filterwarnings('ignore')

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
with open("/content/drive/MyDrive/mimic_cxr/train.findings.tok", "r") as file:
    sentences = file.readlines()
sentences = [sentence.strip() for sentence in sentences]
train_text_df = pd.DataFrame(sentences, columns=['findings'])
train_text_df.shape

In [None]:
with open("/content/drive/MyDrive/mimic_cxr/train.impression.tok", "r") as file:
    sentences = file.readlines()
sentences = [sentence.strip() for sentence in sentences]
train_text_summary_output_df = pd.DataFrame(sentences, columns=['impressions'])
train_text_summary_output_df.shape

In [None]:
# Image embeddings were already available as a step in third approach, so using the results directly here.
image_embeddings_train_npz = np.load('/content/drive/My Drive/mimic_cxr/chexpert_embeddings_train_final.npz')
image_embeddings_train_np = image_embeddings_train_npz['a']
image_embeddings_train_np.shape

In [None]:
train_df = pd.concat([train_text_df, train_text_summary_output_df, pd.Series(list(image_embeddings_train_np), name='image_features')], axis=1)
train_df['dec_ip'] = '<start>' + ' ' + train_df['impressions'].astype(str)
train_df['dec_op'] = train_df['impressions'].astype(str) + ' ' +'<end>'
train_df['impressions'] = '<start> ' + train_df['impressions'] + ' <end>'
train_df.shape

In [None]:
train_df.isnull().sum()

In [None]:
train_image_features = image_embeddings_train_np.copy()
train_image_features = np.vstack(train_image_features).astype(np.float)

In [None]:
train_ff = train_df[10000:50000]
validation_ff = train_df[:10000]
train_image_features_ff = train_image_features[10000:50000]
validation_image_features_ff = train_image_features[:10000]

In [None]:
token = Tokenizer( filters='!"#$%&()*+,-/:;=?@[\\]^_`{|}~\t\n')
token.fit_on_texts(train_ff['findings'])

token.word_index['<pad>'] = 0
token.index_word[0] = '<pad>'
all_words_len = len(token.word_index) + 1

train_decoder_input = token.texts_to_sequences(train_ff.dec_ip)
train_decoder_output = token.texts_to_sequences(train_ff.dec_op)
val_decoder_input = token.texts_to_sequences(validation_ff.dec_ip)
val_decoder_output = token.texts_to_sequences(validation_ff.dec_op)

max_len = 150
decoder_input = pad_sequences(train_decoder_input, maxlen=max_len, padding='post')
decoder_output =  pad_sequences(train_decoder_output, maxlen=max_len, padding='post') 
val_decoder_input = pad_sequences(val_decoder_input, maxlen=max_len, padding='post') 
val_decoder_output = pad_sequences(val_decoder_output, maxlen=max_len, padding='post')

word_index = {}
index_word = {}
for key, value in (token.word_index).items(): 
    word_index[key] = value
    index_word[value] = key


In [None]:
batch_size     = 100
buffer_len    = 500

train_final_dataset = tf.data.Dataset.from_tensor_slices(((train_image_features_ff, decoder_input), decoder_output))
train_final_dataset = train_final_dataset.shuffle(buffer_len).batch(batch_size).prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

validation_final_dataset = tf.data.Dataset.from_tensor_slices(((validation_image_features_ff,val_decoder_input),val_decoder_output))
validation_final_dataset = validation_final_dataset.shuffle(buffer_len).batch(batch_size).prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

In [None]:
class CustomEncoder(tf.keras.Model):

    def __init__(self,lstm_units):
        super().__init__()
        
        self.lstm_units = lstm_units
        self.dense      = Dense(self.lstm_units, kernel_initializer="glorot_uniform", name = 'encoder_dense_layer')
        
    def initialize_states(self, batch_size):
      
        self.batch_size  = batch_size
        self.in_state       = tf.zeros((self.batch_size, self.lstm_units))
      
        return self.in_state
    
    def call(self, inp):
      
        enc_op = self.dense(inp)
      
        return enc_op  


class CustomAttentionLayer(tf.keras.layers.Layer):
    def __init__(self,attn_units):
        super().__init__()

        self.attn_units = attn_units  

        self.dense_1    =  tf.keras.layers.Dense(self.attn_units, kernel_initializer="glorot_uniform", name='Concat_dense_1')
        self.dense_2    =  tf.keras.layers.Dense(self.attn_units, kernel_initializer="glorot_uniform", name='Concat_dense_2')
        self.final_dense_layer=  tf.keras.layers.Dense(1, kernel_initializer="glorot_uniform", name = 'final_dense_layer_layer')
  
    def call(self,x):
    
        self.dec_hidden_state, self.enc_op = x
        self.dec_hidden_state = tf.expand_dims(self.dec_hidden_state,axis = 1)
    
        score = self.final_dense_layer(tf.nn.tanh(self.dense_1(self.dec_hidden_state) + self.dense_2(self.enc_op)))
    
        attn_wghts    = tf.nn.softmax(score, axis=1)
        context_vector = attn_wghts * self.enc_op
        context_vector = tf.reduce_sum(context_vector, axis=1)   
    
        return context_vector, attn_wghts


class CustomSingleStepDecoder(tf.keras.Model):
    def __init__(self, all_words_len, embedding_dim, lstm_units, attn_units):
        super().__init__()
      
        self.lstm_units     = lstm_units
        self.all_words_len     = all_words_len
        self.embedding_dim  = embedding_dim
        self.attn_units= attn_units
      
        self.dense       = Dense(self.all_words_len, kernel_initializer="glorot_uniform", name ='onestep_dense')
        self.attention   = CustomAttentionLayer( self.attn_units)
        self.decoder_emb = Embedding(self.all_words_len, self.embedding_dim, trainable = True , name = 'Decoder_embedding')           
        self.decoder_gru = GRU(self.lstm_units, return_state=True, return_sequences=True, name="Decoder_LSTM") 
      
        self.dropout_layer_1 = Dropout(0.3, name = 'dropout_layer_1')
        self.dropout_layer_2 = Dropout(0.3, name = 'dropout_layer_2')
        self.dropout_layer_3 = Dropout(0.3, name = 'dropout_layer_3')
  
    @tf.function
    def call(self,x,training=None):
    
        self.dec_ip, self.enc_op, self.state_h = x

        embd_layer_op = self.decoder_emb(self.dec_ip)
        embd_layer_op = self.dropout_layer_1(embd_layer_op)
    
        y = [self.state_h, self.enc_op]
        context_vector, attn_wghts = self.attention(y)

        final_decoder_input = tf.concat([tf.expand_dims(context_vector, 1),embd_layer_op], -1)
        final_decoder_input = self.dropout_layer_2(final_decoder_input)

        gru_layer_output, hidden_state = self.decoder_gru(final_decoder_input, initial_state=self.state_h)
    
        gru_layer_output = tf.reshape(gru_layer_output, (-1, gru_layer_output.shape[2]))
        gru_layer_output = self.dropout_layer_3(gru_layer_output)

        output = self.dense(gru_layer_output)

        return output,hidden_state,attn_wghts,context_vector


class CustomDecoder(tf.keras.Model):
    def __init__(self, all_words_len, embedding_dim, lstm_units, attn_units):
        super().__init__()

        self.lstm_units     = lstm_units
        self.all_words_len     = all_words_len
        self.embedding_dim  = embedding_dim
        self.attn_units= attn_units
      
        self.onestepdecoder = CustomSingleStepDecoder(self.all_words_len, self.embedding_dim, self.lstm_units, self.attn_units)

    @tf.function
    def call(self, x,training=None):
        
        self.dec_ip, self.enc_op, self.dec_hidden_state = x
        model_outputs = tf.TensorArray(tf.float32,size = self.dec_ip.shape[1], name = 'output_arrays' )
        
        for t in tf.range(self.dec_ip.shape[1]):
          
            y = [self.dec_ip[:,t:t+1],self.enc_op, self.dec_hidden_state]
            output,hidden_state,attn_wghts,context_vector = self.onestepdecoder(y)
          
            self.dec_hidden_state = hidden_state
            model_outputs = model_outputs.write(t,output)
        
        model_outputs = tf.transpose(model_outputs.stack(),[1,0,2])
        
        return model_outputs


class CustomEncoderDecoder(tf.keras.Model):
    def __init__(self, all_words_len, embedding_dim, lstm_units, attn_units, batch_size):
        super().__init__()

        self.all_words_len     = all_words_len
        self.batch_size     = batch_size
        self.lstm_units     = lstm_units
        self.embedding_dim  = embedding_dim
        self.attn_units= attn_units
        
        self.encoder = CustomEncoder(self.lstm_units)
        self.decoder = CustomDecoder(all_words_len, embedding_dim, lstm_units, attn_units)
        self.dense   = Dense(self.all_words_len, kernel_initializer="glorot_uniform", name = 'enc_dec_dense')

  
    def call(self,data):
    
        self.inputs, self.outputs = data[0], data[1]

        self.enc_hidden_layer = self.encoder.initialize_states(self.batch_size)
        self.enc_op = self.encoder(self.inputs)
    
        x = [self.outputs,self.enc_op,self.enc_hidden_layer]
        output = self.decoder(x)
    
        return output


In [None]:
optimizer = tf.keras.optimizers.Adam()
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')

def loss_function(real, pred):
  mask = tf.math.logical_not(tf.math.equal(real, 0))
  loss_ = loss_object(real, pred)

  mask = tf.cast(mask, dtype=loss_.dtype)
  loss_ *= mask

  return tf.reduce_mean(loss_)

In [None]:

import os
checkpoint_path = "./training_2/cp-{epoch:04d}.ckpt"
checkpoint_dir = os.path.dirname(checkpoint_path)

cp_callback = tf.keras.callbacks.ModelCheckpoint(
   checkpoint_path, verbose=1, save_weights_only=True,
   # Save weights, every epoch.
   save_freq='epoch')

In [None]:
lstm_units     = 256
embedding_dim  = 300
attn_units= 64
tf.keras.backend.clear_session()
Attention_model = CustomEncoderDecoder(all_words_len,embedding_dim,lstm_units,attn_units,batch_size)
Attention_model.compile(optimizer=tf.keras.optimizers.Adam(0.001),loss=loss_function)
Attention_model.fit(train_final_dataset, validation_data=validation_final_dataset, epochs=1, callbacks = [cp_callback], shuffle=True)

In [None]:
pickle.dump(Attention_model, open('/content/drive/My Drive/cv_model.pkl','wb'))

In [None]:
model.layers

In [None]:
def measure_performance(img_data, model):
    img_features = img_data[2].reshape((1,1024))
    result = ''
    init_state  = model.layers[0].initialize_states(1)
    text_seq      = [['<start>', init_state, 0]]
    encoder_output       = model.layers[0](img_features)

    decoder_hidden_state = init_state

    max_seq_len = 75
    top_k_words_count = 3
    final_seq = []

    for i in range(max_seq_len):
        new_seq = []
        prob_list = []
        
        for seq,state,score in text_seq:

            cur_vec = np.reshape(word_index[seq.split(" ")[-1]],(1,1))
            decoder_hidden_state = state
            x = [cur_vec, encoder_output, decoder_hidden_state]
            output,hidden_state,attn_wghts,context_vector = model.get_layer('decoder_1').onestepdecoder(x)
            output = tf.nn.softmax(output)
            top_words = np.argsort(output).flatten()[-top_k_words_count:]
            for index in top_words:
         
                predicted = [seq + ' '+ index_word[index], hidden_state, score-np.log(np.array(output).flatten()[index])]
                prob_list.append(predicted)

        text_seq = sorted(prob_list, key = lambda l: l[2])[:top_k_words_count]

        count = 0
        for seq,state,score in text_seq:
            if seq.split(" ")[-1] == '<end>':
                score = score/len(seq)
                final_seq.append([seq,state,score])
                count+=1
            else:
                new_seq.append([seq,state,score])
        
        text_seq = new_seq
        top_k_words_count= top_k_words_count - count
        if not text_seq:
            break        
        else:
            continue
    if len(final_seq) >0:
          final_seq = sorted(final_seq, reverse=True, key = lambda l: l[2])
          text_seq = final_seq[-1]
          result = text_seq[0][8:]
    else:
          result = new_seq[-1][0]
    print("Predicted text:",result)
    print('BLEU Score:',sentence_bleu(img_data[1], result))

In [None]:
with open('/content/drive/My Drive/cv_model.pkl', 'rb') as f:
    att_model = pickle.load(f)

In [None]:
measure_performance(train_df.values[10001], att_model)

>>>>>>>>>img_data ['<start> Frontal and lateral views of the chest demonstrate normal cardiomediastinal silhouette. The lungs are clear. There is no pneumothorax, vascular congestion, or pleural effusion. Several cholecystectomy clips are seen in the gallbladder fossa. <end>'
 'No evidence of pneumonia.'
 array([0.002626  , 0.0218488 , 0.08873988, ..., 0.00113838, 0.01137492,
        0.16669281])
 '<start> Frontal and lateral views of the chest demonstrate normal cardiomediastinal silhouette. The lungs are clear. There is no pneumothorax, vascular congestion, or pleural effusion. Several cholecystectomy clips are seen in the gallbladder fossa.'
 'Frontal and lateral views of the chest demonstrate normal cardiomediastinal silhouette. The lungs are clear. There is no pneumothorax, vascular congestion, or pleural effusion. Several cholecystectomy clips are seen in the gallbladder fossa. <end>']
>>>>>>>>>>>>>>>>>>1
>>>>>>>>>>>>>>>>>>2
>>>>>>>>>>>>>>>>>>3
Predicted Report : pa and lateral v