In [None]:
!pip install language-tool-python
!pip install rouge
!pip install tensorflow
!pip install openpyxl
!pip install nltk

In [None]:
import os
import csv
import pickle
import string
import random
import gc
import chardet
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
import torch
import nltk
import re
import openpyxl

nltk.download('wordnet')

from nltk.translate.bleu_score import corpus_bleu, sentence_bleu, SmoothingFunction
from nltk.translate.meteor_score import meteor_score, single_meteor_score
from rouge import Rouge
from transformers import AutoTokenizer, AutoModelForMaskedLM
from tensorflow.keras.applications.vgg16 import VGG16, preprocess_input
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import (
    Input, Dense, Dropout, LSTM, Concatenate, Reshape,
    BatchNormalization, Attention, GlobalAveragePooling1D
)
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.utils import load_img, img_to_array, to_categorical, pad_sequences, plot_model
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.regularizers import l2
from tensorflow.keras import mixed_precision
from openpyxl.styles import Font, PatternFill
from openpyxl.utils.exceptions import IllegalCharacterError
from PIL import ImageFile
from pickle import dump

In [None]:
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        print("Memory growth enabled on the GPU.")
    except RuntimeError as e:
        print(e)

In [None]:
def detect_encoding(file_path):
    with open(file_path, 'rb') as file:
        raw_data = file.read()
    result = chardet.detect(raw_data)
    encoding = result['encoding']
    return encoding

def load_caption_file(path):
    encoding = detect_encoding(path)
    captions_dict = {}
    with open(path, "r", encoding=encoding) as file:
        for line in file:
            parts = line.split("\t\t")
            if len(parts) < 2:
                parts = line.split(maxsplit=1)
            if len(parts) == 2:
                image_id, report = parts
                captions = [caption.strip() for caption in report.split(".") if caption.strip()]
                captions_dict[image_id] = captions
            else:
                print(f"Skipping line due to unexpected format: {line.strip()}")
    return captions_dict

def process_reports_in_groups(captions_dict, group_size=8):
    grouped_captions_dict = {}
    temp_dict = {}
    for key, report in captions_dict.items():
        prefix = key[:4]
        if prefix not in temp_dict:
            temp_dict[prefix] = []
        temp_dict[prefix].append((key, report))
    for prefix, items in temp_dict.items():
        for i in range(0, len(items), group_size):
            group = items[i:i + group_size]
            if len(group) == group_size:
                group_report = group[0][1]
                for key, _ in group:
                    grouped_captions_dict[key] = group_report
    return grouped_captions_dict

def extract_features(directory, image_keys):
    model = VGG16()
    model = Model(inputs=model.inputs, outputs=model.layers[-2].output)
    features = dict()
    for name in image_keys:
        filename = os.path.join(directory, name + '.jpg')
        image = load_img(filename, target_size=(224, 224))
        image = img_to_array(image)
        image = image.reshape((1, image.shape[0], image.shape[1], image.shape[2]))
        image = preprocess_input(image)
        feature = model.predict(image, verbose=0)
        image_id = name.split('.')[0]
        features[image_id] = feature
    return features

def load_large_pickle(file_path):
    with open(file_path, 'rb') as file:
        unpickler = pickle.Unpickler(file)
        return unpickler.load()

def data_generator(image_keys, Nmax, group_size=8, batch_size=1):
    while True:
        for i in range(0, len(image_keys), batch_size * group_size):
            x1 = [[] for _ in range(group_size)]
            x2, y = [[] for _ in range(Nmax)], [[] for _ in range(Nmax)]
            batch_keys = image_keys[i:i + batch_size * group_size]
            if len(batch_keys) < batch_size * group_size:
                break
            for j in range(0, len(batch_keys), group_size):
                group_keys = batch_keys[j:j + group_size]
                if len(group_keys) < group_size:
                    break
                group_features = [train_validate_features[image][0] for image in group_keys]
                for image in group_keys:
                    captions_list = train_validate_image_caption.get(image, [])
                    for j, caption in enumerate(captions_list):
                        seq = tokenizer.texts_to_sequences([caption.split()])[0]
                        for k in range(1, len(seq)):
                            x2_seq = pad_sequences([seq[:k]], maxlen=max_len)[0]
                            y_seq = tf.keras.utils.to_categorical(seq[k], num_classes=vocab_len)
                            for idx in range(group_size):
                                x1[idx].append(group_features[idx])
                            for m in range(Nmax):
                                if m == j:
                                    x2[m].append(x2_seq)
                                    y[m].append(y_seq)
                                else:
                                    x2[m].append(np.zeros((max_len,), dtype=np.int32))
                                    y[m].append(np.zeros((vocab_len,), dtype=np.float32))
            yield (
                tuple(np.array(x, dtype=np.float16) for x in x1) +
                tuple(np.array(x, dtype=np.int32) for x in x2),
                tuple(np.array(y_seq, dtype=np.float32) for y_seq in y)
            )

def define_model(max_len, vocab_size, Nmax, group_size=8):
    input_images = [Input(shape=(4096,)) for _ in range(group_size)]
    concatenated_images = Concatenate()(input_images)
    image_features = Dense(2048, activation='relu', kernel_regularizer=l2(1e-4))(concatenated_images)
    image_features = BatchNormalization()(image_features)
    image_features = Dropout(0.4)(image_features)
    input_captions = []
    outputs = []
    for i in range(Nmax):
        input_caption = Input(shape=(max_len,))
        input_captions.append(input_caption)
        reshaped_input_caption = Reshape((max_len, 1))(input_caption)
        lstm_seq = LSTM(128, return_sequences=True, recurrent_dropout=0.3, name=f'lstm_{i}')(reshaped_input_caption)
        attention_output = Attention(name=f'attention_{i}')([lstm_seq, lstm_seq])
        context_vector = GlobalAveragePooling1D()(attention_output)
        context_vector = Dropout(0.4)(context_vector)
        combined_features = Concatenate()([image_features, context_vector])
        dense = Dense(128, activation='relu', kernel_regularizer=l2(1e-4))(combined_features)
        dense = Dropout(0.3)(dense)
        output = Dense(vocab_size, activation='softmax')(dense)
        outputs.append(output)
    model = tf.keras.Model(inputs=input_images + input_captions, outputs=outputs)
    return model

def extract_features(image_paths):
    base_model = VGG16()
    #model = Model(inputs=base_model.input, outputs=model.layers[-2].output)
    model = Model(inputs=base_model.input, outputs=base_model.get_layer('fc2').output)
    features = {}
    for image_path in image_paths:
        image = load_img(image_path, target_size=(224, 224))
        image = img_to_array(image)
        image = preprocess_input(image)
        image = np.expand_dims(image, axis=0)
        feature = model.predict(image)
        image_id = os.path.splitext(os.path.basename(image_path))[0]
        features[image_id] = feature
    return features

def words_for_id(integer, tokenizers):
    matching_words = []
    for tokenizer in tokenizers:
        if integer in tokenizer.word_index:
            matching_words.append(tokenizer.index_word[integer])
    return matching_words

def generate_desc(model, tokenizers, photo, max_len, temperature=1.0):
    all_predictions = []
    for j, tokenizer in enumerate(tokenizers):
        in_text = 'startseq'
        predicted_words = []
        for _ in range(max_len):
            sequence = tokenizer.texts_to_sequences([in_text])[0]
            padded_sequence = pad_sequences([sequence], maxlen=max_len)
            inputs = [photo] * 8
            inputs += [np.zeros((1, max_len)) for _ in range(9)]
            inputs[8 + j] = padded_sequence
            yhat = model.predict(inputs, verbose=0)
            yhat = yhat[j]
            yhat = yhat.flatten()
            yhat = np.log(yhat + 1e-10) / temperature
            yhat = np.exp(yhat) / np.sum(np.exp(yhat))
            next_index = np.random.choice(len(yhat), p=yhat)
            next_word = tokenizer.index_word.get(next_index, None)
            if next_word is not None and next_word != 'endseq':
                predicted_words.append(next_word)
                in_text += ' ' + next_word
            else:
                break
        prediction = ' '.join(predicted_words).replace(' endseq', '').strip()
        all_predictions.append(prediction)
    return all_predictions

def calculate_semantic_score(prediction):
    inputs = bert_tokenizer(prediction, return_tensors="pt", truncation=True, max_length=512)
    with torch.no_grad():
        outputs = bert_model(**inputs, labels=inputs["input_ids"])
        loss = outputs.loss
        perplexity = torch.exp(loss).item()
    return -perplexity

def calculate_heuristic_score(prediction):
    if prediction.strip() == "":
        return -float('inf')
    return calculate_semantic_score(prediction)

def clean_text(text):
    if isinstance(text, str):
        return re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F]', '', text)
    return text

def calculate_bleu_scores(reference, hypothesis):
    smoothing_function = SmoothingFunction().method1
    bleu1 = sentence_bleu([reference], hypothesis, weights=(1, 0, 0, 0), smoothing_function=smoothing_function)
    bleu2 = sentence_bleu([reference], hypothesis, weights=(0.5, 0.5, 0, 0), smoothing_function=smoothing_function)
    bleu3 = sentence_bleu([reference], hypothesis, weights=(0.33, 0.33, 0.33, 0), smoothing_function=smoothing_function)
    bleu4 = sentence_bleu([reference], hypothesis, weights=(0.25, 0.25, 0.25, 0.25), smoothing_function=smoothing_function)
    return bleu1, bleu2, bleu3, bleu4

In [None]:
image_dataset_path = ''
caption_dataset_path = ''

captions_dict = load_caption_file(caption_dataset_path)
grouped_captions_dict = process_reports_in_groups(captions_dict)

print("Number of images in the grouped_captions_dict dictionary:", len(grouped_captions_dict))
for image_id, captions in list(grouped_captions_dict.items()):
    print(image_id, ":", captions)
print()

new_captions_dict = {}
table = str.maketrans('', '', string.punctuation)
for caption_id, caption_list in grouped_captions_dict.items():
    cleaned_captions = []
    for caption_text in caption_list:
        caption_text = caption_text.split()
        caption_text = [token.lower() for token in caption_text]
        caption_text = [token.translate(table) for token in caption_text]
        caption_text = [token for token in caption_text if len(token) > 1]
        cleaned_caption = ' '.join(caption_text)
        cleaned_caption = 'startseq ' + cleaned_caption + ' endseq'
        cleaned_captions.append(cleaned_caption)
    new_captions_dict[caption_id] = cleaned_captions

del grouped_captions_dict
#gc.collect()

Nmax = max(len(captions) for captions in new_captions_dict.values())
Nmax

In [None]:
caption_images_list = []
image_index = list(new_captions_dict.keys())
caption_images_list = [image.split('.')[0] for image in os.listdir(image_dataset_path) if image.split('.')[0] in image_index]
len(caption_images_list)

prefix_groups = {}
for image in caption_images_list:
    prefix = image[:4]
    if prefix not in prefix_groups:
        prefix_groups[prefix] = []
    prefix_groups[prefix].append(image)

grouped_images = []
for group in prefix_groups.values():
    if len(group) == 8:
        grouped_images.append(group)

In [None]:
random.seed(12)
random.shuffle(grouped_images)
flattened_images = [image for group in grouped_images for image in group]
num_test_groups = int(0.20 * len(grouped_images))
test_groups = grouped_images[:num_test_groups]
train_validate_groups = grouped_images[num_test_groups:]
test_images = [image for group in test_groups for image in group]
train_validate_images = [image for group in train_validate_groups for image in group]
print(f"Number of training and validation slices: {len(train_validate_images)}")
print(f"Number of test slices: {len(test_images)}")
random.shuffle(train_validate_images)
random.shuffle(test_images)

In [None]:
ImageFile.LOAD_TRUNCATED_IMAGES = True
train_validate_features = extract_features(image_dataset_path, train_validate_images)
with open(r'.../train-val-features.pkl', 'wb') as f: # .../ --> insert path where to save features
    dump(train_validate_features, f)

train_validate_features = load_large_pickle('.../train-val-features.pkl')
len(train_validate_features)

del captions_dict
#gc.collect()

train_validate_image_caption = {}
for image, caption in new_captions_dict.items():
    if image in train_validate_images and image in list(train_validate_features.keys()):
        train_validate_image_caption.update({image: caption})
len(train_validate_image_caption)

In [None]:
tokenizer = Tokenizer()
all_captions = [caption for captions_list in new_captions_dict.values() for caption in captions_list]
tokenizer.fit_on_texts(all_captions)
vocab_len = len(tokenizer.word_index) + 1
max_len = max(
    max(len(caption.split()) for caption in captions_list) for captions_list in train_validate_image_caption.values())
for caption in all_captions[:6]:
    sequence = tokenizer.texts_to_sequences([caption])[0]
    print("Original caption:", caption)
    print("Tokenized sequence:", sequence)
    print()

In [None]:
policy = mixed_precision.Policy('mixed_float16')
mixed_precision.set_global_policy(policy)

total_train_validate = len(train_validate_images)
num_validate_images = int(0.15 * total_train_validate)
num_train_images = total_train_validate - num_validate_images
group_size = 8
batch_size = 32

train_tf_dataset = tf.data.Dataset.from_generator(
    lambda: data_generator(train_validate_images[:num_train_images], Nmax, group_size=group_size, batch_size=batch_size),
    output_signature=(
        tuple(tf.TensorSpec(shape=(None, 4096), dtype=tf.float16) for _ in range(8)) +
        tuple(tf.TensorSpec(shape=(None, max_len), dtype=tf.int32) for _ in range(Nmax)),
        tuple(tf.TensorSpec(shape=(None, vocab_len), dtype=tf.float32) for _ in range(Nmax))
    )
).prefetch(tf.data.AUTOTUNE)
#).prefetch(2) # reduce prefetch to limit memory usage

validate_tf_dataset = tf.data.Dataset.from_generator(
    lambda: data_generator(train_validate_images[num_train_images:], Nmax, group_size=group_size, batch_size=batch_size),
    output_signature=(
        tuple(tf.TensorSpec(shape=(None, 4096), dtype=tf.float16) for _ in range(8)) +
        tuple(tf.TensorSpec(shape=(None, max_len), dtype=tf.int32) for _ in range(Nmax)),
        tuple(tf.TensorSpec(shape=(None, vocab_len), dtype=tf.float32) for _ in range(Nmax))
    )
).prefetch(tf.data.AUTOTUNE)
#).prefetch(2) # reduce prefetch to limit memory usage

In [None]:
vocab_size = vocab_len
model = define_model(max_len, vocab_size, Nmax, group_size=8)
optimizer = Adam(learning_rate=0.0001, clipnorm=1.0)
model.compile(optimizer=optimizer, loss='categorical_crossentropy')

checkpoint_cb = ModelCheckpoint(
    '.../checkpoint-with-attention-EN.weights.h5', # .../ --> insert path where to save model checkpoints
    save_best_only=False,
    save_weights_only=True,
    save_freq='epoch'
)
earlystop_cb = EarlyStopping(patience=5, restore_best_weights=True)
callbacks = [checkpoint_cb, earlystop_cb]
history = model.fit(
    train_tf_dataset,
    epochs=100,
    validation_data=validate_tf_dataset,
    steps_per_epoch=num_train_images // batch_size,
    validation_steps=num_validate_images // batch_size,
    callbacks=callbacks
)

model.save('.../model-weights-with-attention-EN.h5') # .../ --> insert path where to save model weights

In [None]:
file_path = ".../history-with-attention-EN.txt"

loss = []
val_loss = []
with open(file_path, "r", encoding="utf-8") as f:
    lines = f.readlines()
for line in lines:
    train_loss_match = re.search(r" loss: ([0-9]+\.[0-9]+) ", line)
    val_loss_match = re.search(r"val_loss: ([0-9]+\.[0-9]+)", line)
    if train_loss_match and val_loss_match:
        loss.append(float(train_loss_match.group(1)))
        val_loss.append(float(val_loss_match.group(1)))
print("Epochs:", len(loss))
print("Initial loss:", loss[:5])
print("Initial val loss:", val_loss[:5])
plt.figure(figsize=(8, 4))
plt.plot(range(1, len(loss) + 1), loss, label='TRAINING LOSS')
plt.plot(range(1, len(val_loss) + 1), val_loss, label='VALIDATION LOSS')
if val_loss:
    min_epoch = val_loss.index(min(val_loss)) + 1
    plt.scatter(min_epoch, min(val_loss), color='red', zorder=5, label=f'MIN VAL LOSS (epoch {min_epoch})')
plt.xlabel('EPOCH')
plt.ylabel('LOSS')
plt.title('LEARNING CURVES')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
model = define_model(max_len, vocab_size, Nmax, group_size=8)
model.load_weights('.../model-weights-with-attention-EN.h5')
for i, layer in enumerate(model.inputs):
    print(f"Input {i+1}: {layer.shape}")

bert_tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
bert_model = AutoModelForMaskedLM.from_pretrained("bert-base-uncased")
tokenizers = [Tokenizer() for _ in range(9)]
for tokenizer in tokenizers:
    tokenizer.fit_on_texts([caption for _, captions in new_captions_dict.items() for caption in captions])

predicted_captions = []
actual_captions = []
image_names = []
csv_file_path = '.../output-semantic-with-attention-EN.csv' # .../ --> insert path where to save csv file
with open(csv_file_path, 'w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(['Image Prefix', 'Predicted Report', 'Actual Report'])
    prefix_groups = {}
    for image in test_images:
        prefix = image[:4]
        if prefix not in prefix_groups:
            prefix_groups[prefix] = []
        prefix_groups[prefix].append(image)
    for prefix, images in prefix_groups.items():
        print(f"Frames of CT scan number: {prefix}")
        all_captions_list = [new_captions_dict[image] for image in images]
        image_paths = [os.path.join(image_dataset_path, image + '.jpg') for image in images]
        image_features = extract_features(image_paths)
        photos = [np.array([image_features[image][0]]) for image in images]
        group_predictions = [[] for _ in range(9)]
        for photo, image in zip(photos, images):
            predictions = generate_desc(model, tokenizers, photo, max_len)
            captions_list = new_captions_dict[image]
            num_actual_captions = len(captions_list)
            if num_actual_captions < 9:
                for i in range(num_actual_captions, 9):
                    predictions[i] = ""
            for i in range(9):
                if i < len(predictions):
                    group_predictions[i].append(predictions[i])
            for j, prediction in enumerate(predictions):
                if j < num_actual_captions:
                    print(f"Prediction for predictor {j + 1}: {prediction}")
                else:
                    print(f"Prediction for predictor {j + 1}: (empty)")
            print('---')
        best_predictions = [""] * 9
        best_scores = [-1] * 9
        for i in range(9):
            best_prediction_for_position = None
            found_valid_prediction = False
            for prediction in group_predictions[i]:
                if not prediction.strip():
                    continue
                found_valid_prediction = True
                heuristic_score = calculate_heuristic_score(prediction)
                if heuristic_score > best_scores[i]:
                    best_scores[i] = heuristic_score
                    best_prediction_for_position = prediction
            if not found_valid_prediction or best_prediction_for_position is None:
                if group_predictions[i]:
                    best_prediction_for_position = group_predictions[i][0]
                else:
                    best_prediction_for_position = ""
            best_predictions[i] = best_prediction_for_position
        predicted_captions.append(best_predictions)
        actual_captions.append([word for word in all_captions_list[0][0].split() if word not in ['startseq', 'endseq']])
        print()
        print(f"Best prediction for frames of CT scan number {prefix}: {best_predictions}")
        print()
        print("Predicted -> ", best_predictions)
        print("Actual -> ", [' '.join([word for word in caption.split() if word not in ['startseq', 'endseq']]) for caption in all_captions_list[0]])
        print('*********************************************************************')
        print()
        image_names.append(prefix)
        filtered_preds = [element for element in best_predictions if element]
        best_preds = " ".join(filtered_preds)
        actuals = " ".join([' '.join([word for word in caption.split() if word not in ['startseq', 'endseq']]) for caption in all_captions_list[0]])
        writer.writerow([prefix, best_preds, actuals])

In [None]:
csv_file_path = '.../output-semantic-with-attention-EN.csv'
df = pd.read_csv(csv_file_path)

file_path = '.../output-semantic-with-attention-EN.xlsx' # .../ --> insert path where to save xlsx file
df = df.map(clean_text)
df.to_excel(file_path, index=False, engine='openpyxl')
df = pd.read_excel(file_path)

rouge = Rouge()
bleu1_scores = []
bleu2_scores = []
bleu3_scores = []
bleu4_scores = []
rouge1_scores = []
rouge2_scores = []
rougeL_scores = []
meteor_scores = []
for index, row in df.iterrows():
    reference = row['Actual Report'].split()
    hypothesis = row['Predicted Report'].split()
    bleu1, bleu2, bleu3, bleu4 = calculate_bleu_scores(reference, hypothesis)
    bleu1_scores.append(bleu1)
    bleu2_scores.append(bleu2)
    bleu3_scores.append(bleu3)
    bleu4_scores.append(bleu4)
    rouge_scores = rouge.get_scores(' '.join(hypothesis), ' '.join(reference))[0]
    rouge1_scores.append(rouge_scores['rouge-1']['f'])
    rouge2_scores.append(rouge_scores['rouge-2']['f'])
    rougeL_scores.append(rouge_scores['rouge-l']['f'])
    meteor = meteor_score([reference], hypothesis)
    meteor_scores.append(meteor)

df['BLEU-1'] = bleu1_scores
df['BLEU-2'] = bleu2_scores
df['BLEU-3'] = bleu3_scores
df['BLEU-4'] = bleu4_scores
df['ROUGE-1'] = rouge1_scores
df['ROUGE-2'] = rouge2_scores
df['ROUGE-L'] = rougeL_scores
df['METEOR'] = meteor_scores
global_bleu1 = df['BLEU-1'].mean()
global_bleu2 = df['BLEU-2'].mean()
global_bleu3 = df['BLEU-3'].mean()
global_bleu4 = df['BLEU-4'].mean()
global_rouge1 = df['ROUGE-1'].mean()
global_rouge2 = df['ROUGE-2'].mean()
global_rougeL = df['ROUGE-L'].mean()
global_meteor = df['METEOR'].mean()
df.loc['Global Average'] = [''] * (len(df.columns) - 8) + [global_bleu1, global_bleu2, global_bleu3, global_bleu4, global_rouge1, global_rouge2, global_rougeL, global_meteor]

output_file_path = '.../output-semantic&scores-with-attention-EN.xlsx' # insert path where to save xlsx file

df.to_excel(output_file_path, index=False)
wb = openpyxl.load_workbook(output_file_path)
ws = wb.active
last_row = ws.max_row
red_font = Font(color="FF0000", bold=True)
for cell in ws[last_row]:
    cell.font = red_font
wb.save(output_file_path)