In [None]:
import PIL
import matplotlib.pyplot as plt
import string
import os
import numpy as np
import pandas as pd
import pickle
import nltk
import random
import chardet
import csv
import tensorflow as tf
#print(tf.__version__)
#import ace_tools as tools
import openpyxl
#import language_tool_python
import torch

nltk.download('wordnet')

from tensorflow.keras.applications.vgg16 import VGG16, preprocess_input
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import Input, Dense, Dropout, LSTM, Embedding, Add, Concatenate, Reshape, Bidirectional
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.utils import load_img, img_to_array, to_categorical, plot_model, pad_sequences
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.optimizers import Adam
from IPython.display import Image
from PIL import ImageFile
from pickle import dump
from nltk.translate.bleu_score import corpus_bleu, sentence_bleu, SmoothingFunction
from rouge import Rouge
from nltk.translate.meteor_score import meteor_score, single_meteor_score
from openpyxl.styles import Font, PatternFill
from transformers import AutoTokenizer, AutoModelForMaskedLM

In [None]:
def detect_encoding(file_path):

    with open(file_path, 'rb') as file:
        raw_data = file.read()

    result = chardet.detect(raw_data)
    encoding = result['encoding']

    return encoding

In [None]:
def load_caption_file(path):
    encoding = detect_encoding(path)
    captions_dict = {}

    with open(path, "r", encoding=encoding) as file:

        for line in file:
            parts = line.strip().split("\t")
            if len(parts) == 2:
                image_id, report = parts
                captions_dict[image_id] = report.strip()
            else:
                print(f"Skipping line due to unexpected format: {line.strip()}")

    return captions_dict

In [None]:
def process_reports_in_groups(captions_dict, group_size=5):
    grouped_captions_dict = {}
    temp_dict = {}

    for key, report in captions_dict.items():
        prefix = key[:4]
        if prefix not in temp_dict:
            temp_dict[prefix] = []

        temp_dict[prefix].append((key, report))

    for prefix, items in temp_dict.items():

        for i in range(0, len(items), group_size):
            group = items[i:i + group_size]
            if len(group) == group_size:
                group_report = group[0][1]

                for key, _ in group:
                    grouped_captions_dict[key] = group_report

    return grouped_captions_dict

In [None]:
def extract_features(directory, image_keys):
    model = VGG16()
    model = Model(inputs=model.inputs, outputs=model.layers[-2].output)
    features = dict()

    for name in image_keys:
        filename = os.path.join(directory, name + '.jpg')
        image = load_img(filename, target_size=(224, 224))
        image = img_to_array(image)
        image = image.reshape((1, image.shape[0], image.shape[1], image.shape[2]))
        image = preprocess_input(image)
        feature = model.predict(image, verbose=0)
        image_id = name.split('.')[0]
        features[image_id] = feature

    return features

In [None]:
def prepare_data(image_keys, group_size=5):
    x1 = [[] for _ in range(group_size)]
    x2, y = [], []

    for i in range(0, len(image_keys), group_size):
        group_keys = image_keys[i:i + group_size]
        if len(group_keys) < group_size:
            break

        group_features = [train_validate_features[image][0] for image in group_keys]
        report = train_validate_image_caption[group_keys[0]]
        caption_tokens = report.split()
        seq = tokenizer.texts_to_sequences([caption_tokens])[0]
        length = len(seq)

        for k in range(1, length):
            x2_seq, y_seq = seq[:k], seq[k]
            x2_seq = pad_sequences([x2_seq], maxlen=max_len)[0]
            y_seq = to_categorical([y_seq], num_classes=vocab_len)[0]

            for idx, feature in enumerate(group_features):
                x1[idx].append(feature)

            x2.append(x2_seq)
            y.append(y_seq)

    return [np.array(x) for x in x1], np.array(x2), np.array(y)

In [None]:
def define_model(max_len, vocab_size, group_size=5):
    input_images = [Input(shape=(4096,)) for _ in range(group_size)]

    concatenated_images = Concatenate()(input_images)
    image_features = Dense(4096, activation='relu')(concatenated_images)

    input_caption = Input(shape=(max_len,))
    reshaped_input_caption = Reshape((max_len, 1))(input_caption)

    lstm = LSTM(256)(reshaped_input_caption)

    combined_features = Concatenate()([image_features, lstm])

    dense = Dense(256, activation='relu')(combined_features)
    output = Dense(vocab_size, activation='softmax')(dense)

    model = Model(inputs=input_images + [input_caption], outputs=output)

    return model

In [None]:
def extract_features(image_paths):
    base_model = VGG16(weights='imagenet')
    model = Model(inputs=base_model.input, outputs=base_model.get_layer('fc2').output)

    features = {}

    for image_path in image_paths:
        image = load_img(image_path, target_size=(224, 224))
        image = img_to_array(image)
        image = preprocess_input(image)

        image = np.expand_dims(image, axis=0)

        feature = model.predict(image)

        image_id = os.path.splitext(os.path.basename(image_path))[0]

        features[image_id] = feature

    return features

In [None]:
def generate_desc(model, tokenizer, photo, max_len, temperature=1.0):
    in_text = 'startseq'
    predicted_words = []

    for _ in range(max_len):
        sequence = tokenizer.texts_to_sequences([in_text])[0]
        padded_sequence = pad_sequences([sequence], maxlen=max_len)

        inputs = [photo] * 5 + [padded_sequence]

        yhat = model.predict(inputs, verbose=0)

        yhat = yhat.flatten()

        yhat = np.log(yhat + 1e-10) / temperature
        yhat = np.exp(yhat) / np.sum(np.exp(yhat))

        next_index = np.random.choice(len(yhat), p=yhat)
        next_word = tokenizer.index_word.get(next_index, None)
        if next_word is not None and next_word != 'endseq':
            predicted_words.append(next_word)
            in_text += ' ' + next_word
        else:
            break

    prediction = ' '.join(predicted_words).replace(' endseq', '').strip()

    return prediction

In [None]:
def calculate_bleu_scores(reference, hypothesis):
    smoothing_function = SmoothingFunction().method1
    bleu1 = sentence_bleu([reference], hypothesis, weights=(1, 0, 0, 0), smoothing_function=smoothing_function)
    bleu2 = sentence_bleu([reference], hypothesis, weights=(0.5, 0.5, 0, 0), smoothing_function=smoothing_function)
    bleu3 = sentence_bleu([reference], hypothesis, weights=(0.33, 0.33, 0.33, 0), smoothing_function=smoothing_function)
    bleu4 = sentence_bleu([reference], hypothesis, weights=(0.25, 0.25, 0.25, 0.25), smoothing_function=smoothing_function)

    return bleu1, bleu2, bleu3, bleu4

In [None]:
image_dataset_path = '' # dir containing slices (i.e., XXXX_SliceYYYY)
caption_dataset_path = '' # file containing reports

In [None]:
captions_dict = load_caption_file(caption_dataset_path)

grouped_captions_dict = process_reports_in_groups(captions_dict)

In [None]:
new_captions_dict = {}

table = str.maketrans('', '', string.punctuation)

for caption_id, caption_text in grouped_captions_dict.items():
    cleaned_caption = caption_text.split()
    cleaned_caption = [token.lower() for token in cleaned_caption]
    cleaned_caption = [token.translate(table) for token in cleaned_caption]
    cleaned_caption = [token for token in cleaned_caption if len(token) > 1]
    cleaned_caption = ' '.join(cleaned_caption)
    cleaned_caption = 'startseq ' + cleaned_caption + ' endseq'
    new_captions_dict[caption_id] = cleaned_caption

In [None]:
caption_images_list = [image.split('.')[0] for image in os.listdir(image_dataset_path) if image.split('.')[0] in new_captions_dict]
prefix_groups = {}

for image in caption_images_list:
    prefix = image[:4]
    if prefix not in prefix_groups:
        prefix_groups[prefix] = []

    prefix_groups[prefix].append(image)

grouped_images = [group for group in prefix_groups.values() if len(group) == 5]

In [None]:
random.seed(12)
random.shuffle(grouped_images)

flattened_images = [image for group in grouped_images for image in group]

num_test_groups = int(0.20 * len(grouped_images))

test_groups = grouped_images[:num_test_groups]
train_validate_groups = grouped_images[num_test_groups:]

test_images = [image for group in test_groups for image in group]
train_validate_images = [image for group in train_validate_groups for image in group]

random.shuffle(train_validate_images)
random.shuffle(test_images)

In [None]:
ImageFile.LOAD_TRUNCATED_IMAGES = True
train_validate_features = extract_features(image_dataset_path, train_validate_images)

with open(r'.../train-val-features.pkl', 'wb') as f:
    dump(train_validate_features, f)

In [None]:
with open(r'.../train-val-features.pkl', 'rb') as file:
    train_validate_features = pickle.load(file)

In [None]:
train_validate_image_caption = {image: new_captions_dict[image] for image in train_validate_images if image in train_validate_features}

tokenizer = Tokenizer()
all_captions = list(new_captions_dict.values())
tokenizer.fit_on_texts(all_captions)

vocab_len = len(tokenizer.word_index) + 1
max_len = max(len(caption.split()) for caption in all_captions)

In [None]:
total_train_validate = len(train_validate_images)

num_validate_images = int(0.15 * total_train_validate)
num_train_images = total_train_validate - num_validate_images

train_x1, train_x2, train_y = prepare_data(train_validate_images[:num_train_images], group_size=5)
validate_x1, validate_x2, validate_y = prepare_data(train_validate_images[num_train_images:], group_size=5)

In [None]:
vocab_size = vocab_len
model = define_model(max_len, vocab_size, group_size=5)

learning_rate = 0.001
optimizer = Adam(learning_rate=learning_rate)

model.compile(optimizer=optimizer, loss='categorical_crossentropy')

In [None]:
callbacks = [EarlyStopping(patience=8)]

history = model.fit(train_x1 + [train_x2],
                    train_y,
                    verbose=1,
                    epochs=100,
                    batch_size=32,
                    shuffle=True,
                    callbacks=callbacks,
                    validation_data=(validate_x1 + [validate_x2], validate_y))

model.save('.../model-weights.h5')

In [None]:
plt.figure(figsize=(12, 10))
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('LEARNING CURVES')
plt.xlabel('EPOCHS')
plt.ylabel('LOSS')
plt.legend(['Loss Train', 'Loss Val'], loc='upper right')
plt.show()

In [None]:
csv_file_path = '.../output-semantic.csv'

with open(csv_file_path, 'w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(['Image Prefix', 'Predicted Report', 'Actual Report'])

    prefix_groups = {}

    for image in test_images:
        prefix = image[:4]
        if prefix not in prefix_groups:
            prefix_groups[prefix] = []

        prefix_groups[prefix].append(image)

    for prefix, images in prefix_groups.items():
        print(f"Slices of scan number: {prefix}")

        image_paths = [os.path.join(image_dataset_path, image + '.jpg') for image in images]
        image_features = extract_features(image_paths)

        photos = [np.array([image_features[image][0]]) for image in images]

        actual_report = new_captions_dict[images[0]]

        generated_report = generate_desc(model, tokenizer, photos[0], max_len)

        print(f"Predicted Report: {generated_report}")
        print(f"Actual Report: {actual_report}")
        print('------------')

        writer.writerow([prefix, generated_report, actual_report])

In [None]:
csv_file_path = '.../output-semantic.csv'
df = pd.read_csv(csv_file_path)

file_path = '.../output-semantic.xlsx'
df.to_excel(file_path, index=False, engine='openpyxl')
df = pd.read_excel(file_path)

rouge = Rouge()

bleu1_scores = []
bleu2_scores = []
bleu3_scores = []
bleu4_scores = []
rouge1_scores = []
rouge2_scores = []
rougeL_scores = []
meteor_scores = []

for index, row in df.iterrows():
    if pd.notnull(row['Actual Report']) and pd.notnull(row['Predicted Report']):
        reference = row['Actual Report'].split()
        hypothesis = row['Predicted Report'].split()

        bleu1, bleu2, bleu3, bleu4 = calculate_bleu_scores(reference, hypothesis)
        bleu1_scores.append(bleu1)
        bleu2_scores.append(bleu2)
        bleu3_scores.append(bleu3)
        bleu4_scores.append(bleu4)

        rouge_scores = rouge.get_scores(' '.join(hypothesis), ' '.join(reference))[0]
        rouge1_scores.append(rouge_scores['rouge-1']['f'])
        rouge2_scores.append(rouge_scores['rouge-2']['f'])
        rougeL_scores.append(rouge_scores['rouge-l']['f'])

        meteor = meteor_score([reference], hypothesis)
        meteor_scores.append(meteor)
    else:
        bleu1_scores.append(0)
        bleu2_scores.append(0)
        bleu3_scores.append(0)
        bleu4_scores.append(0)
        rouge1_scores.append(0)
        rouge2_scores.append(0)
        rougeL_scores.append(0)
        meteor_scores.append(0)

df['BLEU-1'] = bleu1_scores
df['BLEU-2'] = bleu2_scores
df['BLEU-3'] = bleu3_scores
df['BLEU-4'] = bleu4_scores
df['ROUGE-1'] = rouge1_scores
df['ROUGE-2'] = rouge2_scores
df['ROUGE-L'] = rougeL_scores
df['METEOR'] = meteor_scores

global_bleu1 = df['BLEU-1'].mean()
global_bleu2 = df['BLEU-2'].mean()
global_bleu3 = df['BLEU-3'].mean()
global_bleu4 = df['BLEU-4'].mean()
global_rouge1 = df['ROUGE-1'].mean()
global_rouge2 = df['ROUGE-2'].mean()
global_rougeL = df['ROUGE-L'].mean()
global_meteor = df['METEOR'].mean()

df.loc['Global Average'] = [''] * (len(df.columns) - 8) + [global_bleu1, global_bleu2, global_bleu3, global_bleu4, global_rouge1, global_rouge2, global_rougeL, global_meteor]

output_file_path = '.../output-semantic&scores.xlsx'
df.to_excel(output_file_path, index=False)

wb = openpyxl.load_workbook(output_file_path)
ws = wb.active

last_row = ws.max_row
red_font = Font(color="FF0000", bold=True)
for cell in ws[last_row]:
    cell.font = red_font

wb.save(output_file_path)

print(df.head())