In [None]:
import os
import glob as gb
import cv2
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.utils import shuffle
from skimage.io import imread, imshow
from tensorflow import keras
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Add, Dense, Activation, Flatten
from tensorflow.keras.optimizers import SGD, Adam
from keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.models import load_model
from keras.preprocessing.image import ImageDataGenerator
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.metrics import classification_report
from mlxtend.plotting import plot_confusion_matrix

## Loading the decomposed dataset 

In [None]:
#get the data path
trainpath= ('...../training_set/')

img_height=224
img_width=224
batch_size=64

train_datagen = ImageDataGenerator(rescale=1./255,
    validation_split=0.2) # set validation split

print("The data is being split into training and validation set")

train_generator = train_datagen.flow_from_directory(
    trainpath,# This is the target directory
    target_size=(img_height, img_width),
    batch_size=batch_size,
    class_mode='categorical',
    subset='training') # set as training data
print("----------------------------------------------------------------")

validation_generator = train_datagen.flow_from_directory(
    trainpath, # same directory as training data
    target_size=(img_height, img_width),
    batch_size=batch_size,
    class_mode='categorical',
    subset='validation') # set as validation data

In [None]:
class_names = train_generator.class_indices
print(class_names)

## Loading the test set

In [None]:
test_datagen = ImageDataGenerator(rescale=1./255)

test_generator = test_datagen.flow_from_directory(
    testpath,# This is the target directory
    target_size=(img_height, img_width),
    batch_size=1,
    class_mode='categorical',
    shuffle=False,  #because you need to yield the images in “order”, to predict the outputs and match them with their unique ids or filenames.
    seed=42) # set as training data

In [None]:
x_test , y_test = [] , []
for i in range(test_generator.n//1):
    a , b = test_generator.next()
    x_test.extend(a) 
    y_test.extend(b)
y_test= np.array(y_test)
y_test.shape

## Fine-tuning the learned parameters from the pretext training model into a new downstream task

In [None]:
# Loading the pretext training model
pretext_training =load_model('........../pretext_CXR.hdf5')
pretext_training.summary()

In [None]:
# Removing the classification layer from the pretext model
model = Model(inputs=pretext_training.input,   outputs = pretext_training.get_layer('dense').output)
model.summary()

In [None]:
# Adding the new classification output layer corresponding to the new downstream task
new_prediction =Dense(len(train_label), activation='softmax', name="new_task")(model.output)

# Building the final 4S_DT model and visualize it
S4_TD_model = Model(inputs=model.input, outputs=new_prediction)

# Freeze all layers initially
for layer in S4_TD_model.layers:
    layer.trainable = False

# Total number of layers
total_layers = len(S4_TD_model.layers)

# Number of layers to progressively unfreeze
# 4S-DT model used the 4 last layers
N = 4

S4_TD_model.summary()

## Training the model

In [None]:
# Progressive fine-tuning: from last layer up to last N layers
for i in range(1, N + 1):
    print(f"\n--- Training with last {i} layer(s) unfrozen ---")

    # Unfreeze the last i layers
    for layer in S4_TD_model.layers[-i:]:
        layer.trainable = True

    # Compile the model again after changing trainable layers
    S4_TD_model.compile(
        optimizer=SGD(learning_rate=0.001, momentum=0.9),
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )

    # Define callbacks
    layer_name = f"last_{i}_layers"
    callbacks = training_opt(layer_name)

    # Train the model
    
    history = S4_TD_model.fit(
        train_generator,
        validation_data=validation_generator,
        epochs=50,
        callbacks=callbacks,
        verbose=1
    )
    

    # Load best weights and evaluate
    best_weights_path = f'.............../CXR/results/{layer_name}.hdf5'
    S4_TD_model.load_weights(best_weights_path)

    # Load best weights
    best_weights_path = os.path.join('....../CXR/results/', f"{layer_name}.hdf5")
    S4_TD_model.load_weights(best_weights_path)

    # Predict on test set
    y_pred_probs = S4_TD_model.predict(x_test)
    y_pred = np.argmax(y_pred_probs, axis=1)
    y_true = np.argmax(y_test, axis=1)

    # error correction step
    k_value = 5        # Define the k value used to decompose the training set
    corrected_pred = np.copy(y_pred)
    for idx in range(len(corrected_pred)):
        corrected_pred[idx] = corrected_pred[idx] // k_value

    # Confusion matrix
    cm = confusion_matrix(y_true, y_pred)
    fig, ax = plot_confusion_matrix(conf_mat=cm, figsize=(5, 5))
    plt.title(f"Confusion Matrix: {layer_name}")
    plt.show()

    # Classification report
    print(f"🧾 Classification Report (Layer: {layer_name}):")
    print(classification_report(y_true, corrected_pred, target_names=test_labels))