In [1]:
import numpy as np
from numpy import random
import random
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler,normalize
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import KFold
from sklearn.linear_model import LogisticRegression
import os
from tensorflow.keras.callbacks import Callback
import tensorflow as tf
import tensorflow.keras as keras
import tensorflow.keras.optimizers
from tensorflow.keras.layers import Flatten,concatenate,Dense,MaxPooling2D,Conv2D,BatchNormalization
import radiomics
import SimpleITK as sitk
import pandas as pd


# Load the Training and Testing Data

We load CT scans from two folders, also I sorted the return list just keep the load in the same order

In [3]:
# label index 0 is positive, 1 is negative
CLASS_NAMES  = ['pos', 'neg']
DATA_DIRECTORY  = './covid19/kcv/'

In [4]:

def load_training():
    X, Y = [], []
    for label, class_name in enumerate(CLASS_NAMES):
        class_dir = os.path.join(DATA_DIRECTORY, class_name)
        for filename in sorted(os.listdir(class_dir)):
            img_path = os.path.join(class_dir, filename)
            img = sitk.ReadImage(img_path)
            img_array = sitk.GetArrayFromImage(img).reshape(512, 512, 1)
            X.append(img_array)
            Y.append(label)
    return X, Y

In [5]:
X, Y= load_training()

We load the features we previous extracted from the images (processure from the above session)

In [7]:
def load_features(feature_files):
    return [np.load(file) for file in feature_files]

feature_files = [
    'original_firstorder_Skewness.npy', 'original_glcm_Autocorrelation.npy',
    'original_glrlm_GrayLevelVariance.npy', 'original_glszm_SizeZoneNonUniformity.npy',
    'original_glcm_ClusterShade.npy', 'original_glcm_DifferenceEntropy.npy'
]

Apply to k fold groups, in this case I am using n =10,
then i am going to prepare the input and the output data for the 10 groups

In [10]:
def prepare_kfold_data(X, Y, feature_arrays, n_splits=10):
    kfold = KFold(n_splits=n_splits, shuffle=True, random_state=1)
    data = {f'train_{i}': [] for i in range(1, 7)}
    data.update({f'test_{i}': [] for i in range(1, 7)})
    data['train_group'], data['test_group'] = [], []
    data['train_labels'], data['test_labels'] = [], []

    for train_idx, test_idx in kfold.split(X):
        for i, feature_array in enumerate(feature_arrays, 1):
            data[f'train_{i}'].append(feature_array[train_idx])
            data[f'test_{i}'].append(feature_array[test_idx])
        data['train_group'].append([X[idx] for idx in train_idx])
        data['test_group'].append([X[idx] for idx in test_idx])
        data['train_labels'].append([Y[idx] for idx in train_idx])
        data['test_labels'].append([Y[idx] for idx in test_idx])

    return {k: np.array(v) for k, v in data.items()}

features = load_features(feature_files)
datasets = prepare_kfold_data(X, Y, features)

# FINs

A essemble of FINs

The part we load the FINs attacted to a DFNN and compare with the performance

In [12]:
def load_and_rename_models(model_paths):
    models = {}
    for name, path in model_paths.items():
        model = keras.models.load_model(path)
        model._name = name
        models[name] = model
    return models

def make_FIN(models, input_shape=(512, 512, 1), model_name="FINsEssemble"):
    input_layer = keras.Input(shape=input_shape)
    
    # Utilize the preloaded models
    outputs = [model(input_layer) for model in models.values()]
    
    # CNN architecture
    x = Conv2D(64, (3, 3), strides=(1, 1), activation='relu')(input_layer)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Conv2D(64, (3, 3), strides=(1, 1), activation='relu')(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Flatten()(x)
    
    # Concatenate all outputs
    concatenated = concatenate([*outputs, x])
    x = BatchNormalization()(concatenated)
    x = Dense(32, activation='relu', kernel_initializer=tf.keras.initializers.GlorotNormal())(x)
    x = Dense(32, activation='relu', kernel_initializer=tf.keras.initializers.GlorotNormal())(x)
    x = Dense(32, activation='relu', kernel_initializer=tf.keras.initializers.GlorotNormal())(x)
    
    final_output = Dense(1, activation='sigmoid', name=model_name)(x)
    model = keras.Model(inputs=input_layer, outputs=final_output, name=model_name)
    
    return model

# Define model paths
model_paths = {
    'feature1': 'Skewness5.h5',
    'feature2': 'Autocorrelation5.h5',
    'feature3': 'GrayLevelVariance5.h5',
    'feature4': 'SizeZoneNonUniformity5.h5',
    'feature5': 'ClusterShade5.h5',
    'feature6': 'DifferenceEntropy5.h5'
}

# Load and rename models
models = load_and_rename_models(model_paths)

# Build the composite model
FinModel = make_FIN(models)


In [None]:
class HistoryLogger(Callback):
    """ Custom callback to log the history of training. """
    def on_epoch_end(self, epoch, logs=None):
        logs = logs or {}
        print(f"Epoch {epoch + 1}:")
        print(f"Accuracy: {logs.get('accuracy')}, Loss: {logs.get('loss')}")
        if 'val_accuracy' in logs:
            print(f"Validation Accuracy: {logs.get('val_accuracy')}, Validation Loss: {logs.get('val_loss')}")
            
history_logger = HistoryLogger()

train_images = datasets['train_group'][7]  # Example: using the 8th fold
train_labels = datasets['train_labels'][7]
test_images = datasets['test_group'][7]
test_labels = datasets['test_labels'][7]

In [None]:
FinModel.compile(optimizer=tf.keras.optimizers.Adam(
     learning_rate= 0.001
),
              loss=tf.keras.losses.BinaryCrossentropy(),
             metrics=['accuracy',tf.keras.metrics.AUC()]
                   )

In [None]:
history_fin = FinModel.fit(
    train_images, train_labels,
    validation_data=(test_images, test_labels),
    batch_size=64,
    epochs=10,
    callbacks=[history_logger],
    shuffle=True
)