In [None]:
!pip install contractions transformers torch torchvision

import pandas as pd
import re
import nltk
from bs4 import BeautifulSoup
from nltk.corpus import stopwords

nltk.download('stopwords')
stop_words = set(stopwords.words('english'))
stop_words.difference_update({'no', 'not'})

import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import os
import warnings
import cv2
from sklearn.model_selection import train_test_split
from PIL import Image
import tensorflow as tf
import torch
import torch.nn as nn
import torch.optim as optim
from transformers import BertModel, BertTokenizer, GPT2LMHeadModel, GPT2Tokenizer, ViTModel, ViTFeatureExtractor
from tqdm import tqdm

warnings.filterwarnings("ignore")

In [None]:
import contractions

In [None]:
df2 = pd.read_csv('/kaggle/input/chest-xrays-indiana-university/indiana_projections.csv')
df1 = pd.read_csv('/kaggle/input/chest-xrays-indiana-university/indiana_reports.csv')


In [None]:
df2 = df2.drop_duplicates(subset=['uid', 'projection'])
directory_path = r'/kaggle/input/chest-xrays-indiana-university/images/images_normalized'
df2['filename'] = df2['filename'].apply(lambda x: f'{directory_path}/{x}')

In [None]:
pivot_df = df2.pivot(index='uid', columns='projection', values='filename').reset_index()

In [None]:
def create_sentence_report(row):
    return f"""{row['findings']}{row['impression']}"""

In [None]:
df1['report'] = df1.apply(create_sentence_report, axis=1)

In [None]:
def expand_contractions(text):
    contractions = {
        "isn't": "is not",
        "doesn't": "does not",
        # Add more contractions as needed
    }
    for contraction, expanded in contractions.items():
        text = re.sub(r'\b{}\b'.format(contraction), expanded, text)
    return text

In [None]:
def preprocess_text(text):
    # Define stopwords
    stop_words = set(stopwords.words('english'))

    # Convert to lowercase
    text = text.lower()

    # Expand contractions
    text = contractions.fix(text)

    # Remove HTML tags
    text = BeautifulSoup(text, "html.parser").get_text()

    # Remove non-alphabetical characters (except spaces and periods)
    text = re.sub(r'[^a-zA-Z\s.]', '', text)


    # Remove words containing 'xx' or similar patterns
    text = re.sub(r'\bxx+\w*\b', '', text)
    text = re.sub(r'\b\w*xx+\w*\b', '', text)

    # Remove the phrase 'year old'
    text = re.sub(r'\byear old\b', '', text)

    # Remove extra whitespace
    text = re.sub(r'\s+', ' ', text).strip()

    # Remove stopwords and filter out words shorter than three characters
    text = ' '.join([word for word in text.split() if word not in stop_words and len(word) > 2])

    return text

In [None]:
df1['report'] = df1['report'].apply(preprocess_text)

In [None]:
columns_to_combine = ['MeSH','Problems', 'image', 'indication', 'comparison', 'findings', 'impression']
df1.drop(columns=columns_to_combine, inplace=True)

In [None]:
data = pd.merge(pivot_df, df1, on='uid', how='inner')
data.dropna(inplace=True)


In [None]:
data

In [None]:
df1["report"][6]

In [None]:
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input
from tensorflow.keras.preprocessing import image
from tensorflow.keras.models import Model
import os

In [None]:
from keras.layers import Input, Dense

In [None]:
from tensorflow.keras.models import Sequential

In [None]:
from tensorflow.keras.layers import Dense, Flatten

In [None]:
from tensorflow.keras.applications import ResNet50

# Define ResNet50 model
image_shape = (224, 224, 3)
image_input = Input(shape=image_shape)

base_model = ResNet50(weights="imagenet",include_top=False, input_tensor=image_input, input_shape=image_shape)
resnet_model = Model(inputs=base_model.input, outputs=base_model.output)

In [None]:
resnet_model.summary()

In [None]:
def image_feature_extraction(image1,image2):


  image_1 = Image.open(image1)

  image_1= np.asarray(image_1.convert("RGB"))


  image_2=Image.open(image2)
  image_2 = np.asarray(image_2.convert("RGB"))

    #normalize the values of the image
  image_1=image_1/255
  image_2=image_2/255

    #resize all image into (224,224)
  image_1 = cv2.resize(image_1,(224,224))
  image_2 = cv2.resize(image_2,(224,224))

  image_1= np.expand_dims(image_1, axis=0)
  image_2= np.expand_dims(image_2, axis=0)

    #now we have read two image per patient. this is goven to the chexnet model for feature extraction

  image_1_out=resnet_model(image_1)
  image_2_out=resnet_model(image_2)
  #conactenate along the width
  conc=np.concatenate((image_1_out,image_2_out),axis=2)
  #reshape into(no.of images passed, length*breadth, depth)
  image_feature=tf.reshape(conc, (conc.shape[0], -1, conc.shape[-1]))



  return image_feature


In [None]:
train_size = int(0.8 * len(data))  # 80% for training, adjust as needed
test_size = len(data) - train_size

In [None]:
train = data.iloc[:train_size]
test = data.iloc[train_size:]


In [None]:
len(train)

In [None]:
# train_features=np.zeros((2710,98,2048))


# for row in tqdm(range(train.shape[0])):
#     image_1=train.iloc[row]["Frontal"]
#     image_2=train.iloc[row]["Lateral"]
#     train_features[row]=(image_feature_extraction(image_1,image_2))


In [None]:
# test_features=np.zeros((678,98,2048))
# for row in tqdm(range(test.shape[0])):
#     image_1=test.iloc[row]["Frontal"]
#     image_2=test.iloc[row]["Lateral"]
#     test_features[row]=(image_feature_extraction(image_1,image_2))

In [None]:
train_features=np.load("/kaggle/input/image-features-data-resnet/train_features_RES.npy")
test_features=np.load("/kaggle/input/image-features-data-resnet/test_features_RES.npy")

In [None]:
print(train_features.shape)
print(test_features.shape)

In [None]:
# np.save('/kaggle/working/train_features_RES.npy', train_features)
# np.save('/kaggle/working/test_features_RES.npy', test_features)

In [None]:
train_report=["<sos> "+text+" <eos>" for text in train["report"].values]
train_report_in=["<sos> "+text for text in train["report"].values]
train_report_out=[text+" <eos>" for text in train["report"].values]

test_report=["<sos> " +text+" <eos>" for text in test["report"].values]
test_report_in=["<sos> " +text for text in test["report"].values]
test_report_out=[text+" <eos>" for text in test["report"].values]

In [None]:
print(train_report_in[0])
print("*"*100)
print(train_report_out[0])


In [None]:
bs=10
max_len=80


In [None]:
token=tf.keras.preprocessing.text.Tokenizer(filters='!"#$%&()*+,-./:;?@[\]^_{|}~\t\n')

token.fit_on_texts(train_report)
vocab_size=len(token.word_index)+1

seq=token.texts_to_sequences(train_report_in)
train_padded_inp=tf.keras.preprocessing.sequence.pad_sequences(seq,maxlen=max_len,padding="post")


seq=token.texts_to_sequences(train_report_out)
train_padded_out=tf.keras.preprocessing.sequence.pad_sequences(seq,maxlen=max_len,padding="post")

seq=token.texts_to_sequences(test_report_in)
test_padded_inp=tf.keras.preprocessing.sequence.pad_sequences(seq,maxlen=max_len,padding="post")


seq=token.texts_to_sequences(test_report_out)
test_padded_out=tf.keras.preprocessing.sequence.pad_sequences(seq,maxlen=max_len,padding="post")

In [None]:
embeddings_index=dict()
f = open('/kaggle/input/data-glove/glove.6B.300d.txt')
for line in f:
	values = line.split()
	word = values[0]
	coefs = np.asarray(values[1:], dtype='float32')
	embeddings_index[word] = coefs
f.close()
print("Done")
# create a weight matrix for words in training docs

embedding_matrix = np.zeros((vocab_size, 300))
for word, i in tqdm(token.word_index.items()):
	embedding_vector = embeddings_index.get(word)
	if embedding_vector is not None:
		embedding_matrix[i] = embedding_vector


In [None]:
enc_units=64
embedding_dim=300
dec_units=64
att_units=64

In [None]:
class Encoder(tf.keras.Model):
    def __init__(self, units):
        super().__init__()
        self.units = units

    def build(self, input_shape):
        self.dense1 = tf.keras.layers.Dense(self.units, activation="relu", name="encoder_dense")
        self.maxpool = tf.keras.layers.Dropout(0.5)

    def call(self, input_):
        enc_out = self.maxpool(input_)
        enc_out = self.dense1(enc_out)
        return enc_out

    def initialize_states(self, batch_size):
        forward_h = tf.zeros((batch_size, self.units))
        back_h = tf.zeros((batch_size, self.units))
        return forward_h, back_h

In [None]:
class Attention(tf.keras.layers.Layer):
    def __init__(self, att_units):
        super().__init__()
        self.att_units = att_units

    def build(self, input_shape):
        self.wa = tf.keras.layers.Dense(self.att_units)
        self.wb = tf.keras.layers.Dense(self.att_units)
        self.v = tf.keras.layers.Dense(1)

    def call(self, decoder_hidden_state, encoder_output):
        x = tf.expand_dims(decoder_hidden_state, 1)
        alpha_dash = self.v(tf.nn.tanh(self.wa(encoder_output) + self.wb(x)))
        alphas = tf.nn.softmax(alpha_dash, 1)
        context_vector = tf.matmul(encoder_output, alphas, transpose_a=True)[:,:,0]
        return context_vector, alphas

In [None]:
class One_Step_Decoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, input_length, dec_units, att_units):
        super().__init__()
        self.att_units = att_units
        self.vocab_size = vocab_size
        self.embedding_dim = embedding_dim
        self.input_length = input_length
        self.dec_units = dec_units
        self.attention = Attention(self.att_units)
        self.embedding = Embedding(
            self.vocab_size,
            output_dim=self.embedding_dim,
            input_length=self.input_length,
            mask_zero=True,
            trainable=False,
            embeddings_initializer=tf.keras.initializers.Constant(embedding_matrix)
        )
        self.gru = tf.keras.layers.Bidirectional(tf.keras.layers.GRU(self.dec_units, return_sequences=True, return_state=True))
        self.dense = tf.keras.layers.Dense(self.vocab_size, name="decoder_final_dense")
        self.dense_2 = tf.keras.layers.Dense(self.embedding_dim, name="decoder_dense2")
        self.dropout = tf.keras.layers.Dropout(0.5)  # Move dropout initialization here

    def call(self, input_to_decoder, encoder_output, for_h, bac_h):
        embed = self.embedding(input_to_decoder)
        state_h = tf.keras.layers.Add()([for_h, bac_h])
        context_vector, alpha = self.attention(state_h, encoder_output)
        context_vector = self.dense_2(context_vector)
        result = tf.concat([tf.expand_dims(context_vector, axis=1), embed], axis=-1)
        output, forward_h, back_h = self.gru(result, initial_state=[for_h, bac_h])
        out = tf.reshape(output, (-1, output.shape[-1]))
        out = self.dropout(out)  # Use dropout layer here
        dense_op = self.dense(out)
        return dense_op, forward_h, back_h, alpha

In [None]:
class Decoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, output_length, dec_units, att_units):
        super().__init__()
        self.onestep = One_Step_Decoder(vocab_size, embedding_dim, output_length, dec_units, att_units)

    def call(self, input_to_decoder, encoder_output, state_1, state_2):
        all_outputs = tf.TensorArray(tf.float32, input_to_decoder.shape[1], name="output_array")
        for step in range(input_to_decoder.shape[1]):
            output, state_1, state_2, alpha = self.onestep(input_to_decoder[:, step:step+1], encoder_output, state_1, state_2)
            all_outputs = all_outputs.write(step, output)
        all_outputs = tf.transpose(all_outputs.stack(), [1, 0, 2])
        return all_outputs

In [None]:
import warnings
warnings.filterwarnings("ignore")

In [None]:
class encoder_decoder(tf.keras.Model):
    def __init__(self, enc_units, embedding_dim, vocab_size, output_length, dec_units, att_units, batch_size):
        super().__init__()
        self.batch_size = batch_size
        self.encoder = Encoder(enc_units)
        self.decoder = Decoder(vocab_size, embedding_dim, output_length, dec_units, att_units)

    def call(self, data):
        features, report = data[0], data[1]
        encoder_output = self.encoder(features)
        state_h, back_h = self.encoder.initialize_states(self.batch_size)
        output = self.decoder(report, encoder_output, state_h, back_h)
        return output

In [None]:
from tensorflow.keras.layers import Embedding

In [None]:
model  = encoder_decoder(enc_units,embedding_dim,vocab_size,max_len,dec_units,att_units,bs)

In [None]:
optimizer = tf.keras.optimizers.Adam()

loss_function = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')

def custom_lossfunction(y_true, y_pred):
    #getting mask value
    mask = tf.math.logical_not(tf.math.equal(y_true, 0))

    #calculating the loss
    loss_ = loss_function(y_true, y_pred)

    #converting mask dtype to loss_ dtype
    mask = tf.cast(mask, dtype=loss_.dtype)

    #applying the mask to loss
    loss_ = loss_*mask

    #getting mean over all the values
    loss_ = tf.reduce_mean(loss_)
    return loss_

In [None]:

model.compile(optimizer=optimizer,loss=custom_lossfunction)

In [None]:
red_lr=tf.keras.callbacks.ReduceLROnPlateau(monitor="val_loss",factor=0.2,patience=2, min_lr=0.0001)
ckpt=tf.keras.callbacks.ModelCheckpoint("model2wts.keras",monitor='val_loss', verbose=0, save_best_only=True,save_weights_only=False, mode='auto')


In [None]:
model.fit([train_features[:2710],train_padded_inp[:2710]],train_padded_out[:2710],validation_data=([test_features[:670],test_padded_inp[:670]],test_padded_out[:670]),
          batch_size=bs,epochs=15,callbacks=[red_lr,ckpt])

In [None]:
def take_second(elem):
    return elem[1]

In [None]:
import nltk.translate.bleu_score as bleu
import time

In [None]:
def beam_search(image1,image2, beam_index):

    hidden_state =  tf.zeros((1, enc_units))
    hidden_state2 =  tf.zeros((1, enc_units))
    image_features=image_feature_extraction(image1,image2)

    encoder_out = model.layers[0](image_features)

    start_token = [token.word_index["<sos>"]]
    dec_word = [[start_token, 0.0]]
    while len(dec_word[0][0]) < max_len:
        temp = []
        for word in dec_word:

            predict, hidden_state,hidden_state2,alpha = model.layers[1].onestep(tf.expand_dims([word[0][-1]],1), encoder_out, hidden_state,hidden_state2)


            word_predict = np.argsort(predict[0])[-beam_index:]
            for i in word_predict:

                next_word, probab = word[0][:], word[1]
                next_word.append(i)
                probab += predict[0][i]
                temp.append([next_word, probab.numpy()])
        dec_word = temp
        # Sorting according to the probabilities scores


        dec_word = sorted(dec_word, key=take_second)

        # Getting the top words
        dec_word = dec_word[-beam_index:]


    final = dec_word[-1]

    report =final[0]
    score = final[1]
    temp = []

    for word in report:
      if word!=0:
        if word != token.word_index["<eos>"]:
            temp.append(token.index_word[word])
        else:
            break

    rep = " ".join(e for e in temp)

    return rep, score

In [None]:
import random
start=time.time()
i=random.sample(range(test.shape[0]),1)[0]
img1=test.iloc[i]["Frontal"]
img2=test.iloc[i]["Lateral"]
  #show th corresponding x-ray images
i1=cv2.imread(img1)
i2=cv2.imread(img2)
plt.figure(figsize=(10,6))
plt.subplot(131)
plt.title("image1")
plt.imshow(i1)
plt.subplot(132)
plt.title("image2")
plt.imshow(i2)
plt.show()
  #printing the actual and generated results

result,score=beam_search(img1,img2,3)
actual=test_report[i]

print("ACTUAL REPORT: ",actual)
print("GENERATED REPORT: ",result)
end=time.time()
print("BLEU SCORE IS: ",bleu.sentence_bleu(actual,result))
print("time required for the evaluation is ",end-start)

In [None]:
import random
start=time.time()
i=random.sample(range(test.shape[0]),1)[0]
img1=test.iloc[i]["Frontal"]
img2=test.iloc[i]["Lateral"]
  #show th corresponding x-ray images
i1=cv2.imread(img1)
i2=cv2.imread(img2)
plt.figure(figsize=(10,6))
plt.subplot(131)
plt.title("image1")
plt.imshow(i1)
plt.subplot(132)
plt.title("image2")
plt.imshow(i2)
plt.show()
  #printing the actual and generated results

result,score=beam_search(img1,img2,2)
actual=test_report[i]

print("ACTUAL REPORT: ",actual)
print("GENERATED REPORT: ",result)
end=time.time()
print("BLEU SCORE IS: ",bleu.sentence_bleu(actual,result))
print("time required for the evaluation is ",end-start)

In [None]:
import random
start=time.time()
i=random.sample(range(test.shape[0]),1)[0]
img1=test.iloc[i]["Frontal"]
img2=test.iloc[i]["Lateral"]
  #show th corresponding x-ray images
i1=cv2.imread(img1)
i2=cv2.imread(img2)
plt.figure(figsize=(10,6))
plt.subplot(131)
plt.title("image1")
plt.imshow(i1)
plt.subplot(132)
plt.title("image2")
plt.imshow(i2)
plt.show()
  #printing the actual and generated results

result,score=beam_search(img1,img2,4)
actual=test_report[i]

print("ACTUAL REPORT: ",actual)
print("GENERATED REPORT: ",result)
end=time.time()
print("BLEU SCORE IS: ",bleu.sentence_bleu(actual,result))
print("time required for the evaluation is ",end-start)

In [None]:
import random
start=time.time()
i=random.sample(range(test.shape[0]),1)[0]
img1=test.iloc[i]["Frontal"]
img2=test.iloc[i]["Lateral"]
  #show th corresponding x-ray images
i1=cv2.imread(img1)
i2=cv2.imread(img2)
plt.figure(figsize=(10,6))
plt.subplot(131)
plt.title("image1")
plt.imshow(i1)
plt.subplot(132)
plt.title("image2")
plt.imshow(i2)
plt.show()
  #printing the actual and generated results

result,score=beam_search(img1,img2,1)
actual=test_report[i]

print("ACTUAL REPORT: ",actual)
print("GENERATED REPORT: ",result)
end=time.time()
print("BLEU SCORE IS: ",bleu.sentence_bleu(actual,result))
print("time required for the evaluation is ",end-start)

In [None]:
# def evaluate_reports(test_data, beam_index):
#     actual_reports = []
#     generated_reports = []
#     bleu_scores = []

#     for i in tqdm(range(len(test_data))):
#         img1 = test_data.iloc[i]["Frontal"]
#         img2 = test_data.iloc[i]["Lateral"]

#         # Generate report
#         result, _ = beam_search(img1, img2, beam_index)
#         actual = test_report[i]

#         # Calculate BLEU score
#         bleu_score = bleu.sentence_bleu([actual.split()], result.split())

#         # Store results
#         actual_reports.append(actual)
#         generated_reports.append(result)
#         bleu_scores.append(bleu_score)

#     # Create a DataFrame to store the results
#     results_df = pd.DataFrame({
#         'Actual Report': actual_reports,
#         'Generated Report': generated_reports,
#         'BLEU Score': bleu_scores
#     })

#     return results_df

In [None]:
# results_df = evaluate_reports(test, beam_index=3)

In [None]:
# results_df.to_csv('report_evaluation.csv', index=False)