Import required modules

In [1]:
import os
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
import json
import patient_data
import cnn
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Activation, Dense, Conv2D, MaxPool2D, Flatten
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import categorical_crossentropy


2024-12-01 22:39:29.827539: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-12-01 22:39:29.844897: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1733092769.864225 1224268 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1733092769.870029 1224268 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-12-01 22:39:29.891171: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instr

Access the folder path for the cancer and the non-cancer images

In [2]:
all_paths = json.loads(open("./paths.json").read())

personal_path = all_paths['personal_path']
non_cancerous_path = personal_path + all_paths['non_cancerous_path']
cancerous_path = personal_path + all_paths['cancerous_path']

Load in all the DICOM files and preprocess/label images

In [3]:
# Using the patient_data data structure, load in all the patient data and save it in a dictionary with the folder name as the key
def load_all_patients(path, add_label = False):
    patients = {}
    folder = os.listdir(path)
    for name in folder:
        patients[name] = patient_data.Patient(os.path.join(path, name))
        if add_label:
            if patients[name].segpath == None:
                print(name, "was not processed correctly")
                patients.pop(name)
            else:
                patients[name].label_imgs()
    return patients

# nc_patients = load_all_patients(non_cancerous_path)
c_patients = load_all_patients(cancerous_path, True)


Setting up train/test data

In [4]:
# # not sure if we need this
# # create a list for the merged data
# x = []
# y = []

# create a list for only the cancerous dataset data
x_c = []
y_c = []
# # create a list for only the non-cancerous dataset data
# x_nc = []
# y_nc = []

for patient in c_patients.values():
    for i, img in enumerate(patient.ct.data.values()):
        x_c.append(img)
        y_c.append(patient.labels[i])
        # # not sure if we need this
        # x.append(img)
        # y.append(patient.labels[i])

x_c, y_c = shuffle(x_c, y_c)
c_patients = None

# for patient in nc_patients.values():
#     for i, img in enumerate(patient.ct.images):
#         x_nc.append(img)
#         y_nc.append(patient.labels[i])
#         # # not sure if we need this
#         # x.append(img)
#         # y.append(patient.labels[i])

# # not sure if we need this
# # Shuffle the merged data
# combined = list(zip(x, y))
# np.random.shuffle(combined)
# x2, y2 = zip(*combined)

# def generate_train_test():
#     # to ensure equal distribution of non-cancer to cancer data, split the data before merging it
#     x_train, x_test, y_train, y_test = train_test_split(x_c, y_c, test_size=0.2, random_state=42)

#     # x_train_add, x_test_add, y_train_add, y_test_add = train_test_split(x_nc, y_nc, test_size=0.2, random_state=42)
#     # x_train.extend(x_train_add) 
#     # x_test.extend(x_test_add) 
#     # y_train.extend(y_train_add) 
#     # y_test.extend(y_test_add) 

#     # Convert lists to arrays
#     x_train = np.array(x_train)/255
#     x_test = np.array(x_test)/255
#     y_train = np.array(y_train)
#     y_test = np.array(y_test)

#     return x_train, x_test, y_train, y_test, x_train[0]

# x_train, x_test, y_train, y_test, test = generate_train_test()

In [5]:
physical_devices = tf.config.experimental.list_physical_devices('GPU')
print('Num GPUs Available: ', len(physical_devices))
if len(physical_devices) > 0:
    tf.config.experimental.set_memory_growth(physical_devices[0], True)

Num GPUs Available:  1


Saving the model template as a JSON

In [6]:
# only need to run this function once ever, unless you chose to change the model
if not os.path.isfile("models/model.json"):
    model = Sequential([
            Conv2D(filters=32, kernel_size=(3, 3), activation='relu', padding='same', input_shape=(512, 512, 1)),
            MaxPool2D(pool_size=(2, 2), strides=2),
            Conv2D(filters=64, kernel_size=(3, 3), activation='relu', padding='same'),
            MaxPool2D(pool_size=(2, 2), strides=2),
            Flatten(),
            Dense(units=1, activation='sigmoid')
        ])
    model = model.to_json()
    with open("models/model.json", "w") as json_file:
        json_file.write(model)

In [None]:
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import classification_report

# in case the user is importing the model from a previously saved JSON
json_file = open('models/model.json', 'r')
model_json = json_file.read()
json_file.close()

# Define K-Fold Cross-Validation
n_splits = 5
kfold = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
x_data = np.array(x_c)/255  # Normalize the images
y_data = np.array(y_c)
# Model training and evaluation loop
fold_results = []

for fold, (train_idx, val_idx) in enumerate(kfold.split(x_c, y_c)):
    print(f"\nTraining fold {fold + 1}/{n_splits}")
    
    # Split data
    x_train, x_val = x_data[train_idx], x_data[val_idx]
    y_train, y_val = y_data[train_idx], y_data[val_idx]

    # Build the model
    model = tf.keras.models.model_from_json(model_json)
    model.compile(
        optimizer=Adam(learning_rate=0.0001),
        loss='binary_crossentropy',
        metrics=['accuracy']
    )

    # Train the model
    model.fit(
        x=x_train,
        y=y_train,
        validation_data=(x_val, y_val),
        batch_size=10,
        epochs=20,
        verbose=1
    )

    # Evaluate the model
    predictions = (model.predict(x_val) > 0.5).astype("int32")
    report = classification_report(y_val, predictions, output_dict=True)
    print(classification_report(y_val, predictions))
    # Save fold results
    model.save(f"models/kfold_{i}_model.keras", fold)
    model = None
    fold_results.append(report)

# Aggregate results
avg_accuracy = np.mean([fold['accuracy'] for fold in fold_results])
print(f"\nAverage Accuracy Across {n_splits} Folds: {avg_accuracy:.4f}")


Training fold 1/5


In [None]:
from tensorflow.keras.models import Model
from tensorflow.keras import backend as K
from tensorflow.keras.preprocessing import image
import matplotlib.pyplot as plt
import numpy as np

# Grad-CAM function to compute heatmap
def get_gradcam_heatmap(model, img_array, last_conv_layer_name, pred_index=None):
    grad_model = Model([model.inputs], [model.get_layer(last_conv_layer_name).output, model.output])
    with tf.GradientTape() as tape:
        conv_outputs, predictions = grad_model(img_array)
        if pred_index is None:
            pred_index = tf.argmax(predictions[0])
        class_channel = predictions[:, pred_index]
    grads = tape.gradient(class_channel, conv_outputs)
    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))
    conv_outputs = conv_outputs[0]
    heatmap = conv_outputs @ pooled_grads[..., tf.newaxis]
    heatmap = tf.squeeze(heatmap)
    heatmap = tf.maximum(heatmap, 0) / tf.math.reduce_max(heatmap)
    return heatmap.numpy()

# Function to overlay Grad-CAM heatmap on original image
def display_gradcam(img_path, heatmap, alpha=0.4):
    img = image.load_img(img_path)
    img = image.img_to_array(img)
    heatmap = np.uint8(255 * heatmap)
    jet = plt.cm.get_cmap("jet")
    jet_colors = jet(np.arange(256))[:, :3]
    jet_heatmap = jet_colors[heatmap]
    jet_heatmap = image.array_to_img(jet_heatmap).resize((img.shape[1], img.shape[0]))
    jet_heatmap = image.img_to_array(jet_heatmap)
    superimposed_img = jet_heatmap * alpha + img
    return image.array_to_img(superimposed_img)

In [None]:
# Saliency map computation function
def compute_saliency_map(model, img_array, pred_index=None):
    img_array = tf.convert_to_tensor(img_array)
    with tf.GradientTape() as tape:
        tape.watch(img_array)
        predictions = model(img_array)
        if pred_index is None:
            pred_index = tf.argmax(predictions[0])
        loss = predictions[:, pred_index]
    grads = tape.gradient(loss, img_array)
    saliency = tf.reduce_max(tf.abs(grads), axis=-1).numpy()
    return saliency[0]

# Function to display saliency map
def display_saliency_map(img_array, saliency_map):
    plt.imshow(saliency_map, cmap="viridis")
    plt.axis("off")
    plt.show()


In [None]:
    # Train the model
    model.fit(
        x=x_train,
        y=y_train,
        validation_data=(x_val, y_val),
        batch_size=10,
        epochs=20,
        verbose=1
    )

    # Evaluate the model
    predictions = (model.predict(x_val) > 0.5).astype("int32")
    report = classification_report(y_val, predictions, output_dict=True)
    print(classification_report(y_val, predictions))
    
    # Save fold results
    fold_results.append(report)

# Aggregate results
avg_accuracy = np.mean([fold['accuracy'] for fold in fold_results])
print(f"\nAverage Accuracy Across {n_splits} Folds: {avg_accuracy:.4f}")

In [None]:
model = Sequential([
    Conv2D(filters=32, kernel_size=(3,3), activation='relu', padding='same', input_shape=(512,512,1)),
    MaxPool2D(pool_size=(2,2), strides=2),
    Conv2D(filters=64, kernel_size=(3,3), activation='relu', padding='same'),
    MaxPool2D(pool_size=(2,2), strides=2),
    Flatten(),
    Dense(units=1, activation='sigmoid')
])

In [None]:
model.summary()
model.compile(
    optimizer=Adam(learning_rate=0.0001),
    loss='binary_crossentropy',
    metrics=['accuracy']
)

In [None]:
model.fit(
    x=x_train,
    y=y_train,
    validation_data=(x_test, y_test),
    batch_size=10,
    epochs=20,
    verbose=1
)

In [None]:
predictions = (model.predict(x_test) > 0.5).astype("int32")


In [None]:
model.save("/models/shri_model_1.keras")


In [None]:
print(len(y_test))
for i, j in enumerate(y_test):
    k = predictions[i][0]
    if j != k:
        print(j, k)

Train and test CNN model

In [None]:

# num_tests = 1
# cnns = []
# for i in range(num_tests):
# cnns.append(cnn.CNN(x_train, x_test, y_train, y_test))

Cross validation and bootstrapping

In [None]:
# print(cnns[0].test_acc)