# **LIBRARIES**

In [None]:
import os
import pickle
import zipfile
import random
import cv2
import numpy as np
import pandas as pd
import keras
import tensorflow as tf
import matplotlib.pyplot as plt
from tqdm import tqdm
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, matthews_corrcoef
from keras.utils import to_categorical
from keras.applications import VGG16
from keras import Sequential
from keras.models import Model
from keras.layers import InputLayer, Flatten, Dense, Dropout, BatchNormalization
from keras.optimizers import Adam
from keras.callbacks import ReduceLROnPlateau, EarlyStopping
from keras.saving import load_model

In [None]:
def random_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    tf.random.set_seed(seed)
    keras.utils.set_random_seed(seed)

    os.environ["PYTHONHASHSEED"] = "42"

In [None]:
random_seed(42)

# **METRICS**

In [None]:
def fdr(y_true, y_pred):
    FP = np.sum((y_true == 0) & (y_pred == 1))
    TP = np.sum((y_true == 1) & (y_pred == 1))
    
    fdr_ = FP / (FP + TP) if (FP + TP) > 0 else 0
    
    return fdr_

In [None]:
def fnr(y_true, y_pred):
    FN = np.sum((y_true == 1) & (y_pred == 0))
    TP = np.sum((y_true == 1) & (y_pred == 1))
    
    fnr_ = FN / (FN + TP) if (FN + TP) > 0 else 0
    
    return fnr_

In [None]:
def specificity(y_true, y_pred):
    TN = np.sum((y_true == 0) & (y_pred == 0))
    FP = np.sum((y_true == 0) & (y_pred == 1))
    
    specificity_ = TN / (TN + FP) if (TN + FP) > 0 else 0
    
    return specificity_

In [None]:
def npv(y_true, y_pred):
    TN = np.sum((y_true == 0) & (y_pred == 0))
    FN = np.sum((y_true == 1) & (y_pred == 0))
    
    npv_ = TN / (TN + FN) if (TN + FN) > 0 else 0
    
    return npv_

# **DATASET**

In [None]:
img_size = 128

In [None]:
x_train_cxr = np.load('/kaggle/input/pneumonia-detection-datasets/chest-xray/train/images.npy')
x_train_ch0 = np.load('/kaggle/input/pneumonia-detection-datasets/segment/train/images.npy')
x_train_ch1 = np.load('/kaggle/input/pneumonia-detection-datasets/segment_with_convexhull/train/images.npy')
y_train = np.load('/kaggle/input/pneumonia-detection-datasets/chest-xray/train/labels.npy')

x_test_cxr = np.load('/kaggle/input/pneumonia-detection-datasets/chest-xray/test/images.npy')
x_test_ch0 = np.load('/kaggle/input/pneumonia-detection-datasets/segment/test/images.npy')
x_test_ch1 = np.load('/kaggle/input/pneumonia-detection-datasets/segment_with_convexhull/test/images.npy')
y_test = np.load('/kaggle/input/pneumonia-detection-datasets/chest-xray/test/labels.npy')

print(np.shape(x_train_cxr))
print(np.shape(x_train_ch0))
print(np.shape(x_train_ch1))
print(np.shape(y_train))
print(np.shape(x_test_cxr))
print(np.shape(x_test_ch0))
print(np.shape(x_test_ch1))
print(np.shape(y_test))

In [None]:
indices_0 = np.where(y_train == 0)[0]
indices_1 = np.where(y_train == 1)[0]

random_seed(42)
random_indices_0 = np.random.choice(indices_0, size=250, replace=0)
random_indices_1 = np.random.choice(indices_1, size=350, replace=0)
random_indices = np.concatenate((random_indices_0, random_indices_1), axis=0)

x_val_cxr = x_train_cxr[random_indices]
x_val_ch0 = x_train_ch0[random_indices]
x_val_ch1 = x_train_ch1[random_indices]
y_val = y_train[random_indices]

x_train_cxr = np.delete(x_train_cxr, random_indices, axis=0)
x_train_ch0 = np.delete(x_train_ch0, random_indices, axis=0)
x_train_ch1 = np.delete(x_train_ch1, random_indices, axis=0)
y_train = np.delete(y_train, random_indices, axis=0)

print(np.shape(x_train_cxr))
print(np.shape(x_train_ch0))
print(np.shape(x_train_ch1))
print(np.shape(y_train))
print(np.shape(x_val_cxr))
print(np.shape(x_val_ch0))
print(np.shape(x_val_ch1))
print(np.shape(y_val))
print(np.shape(x_test_cxr))
print(np.shape(x_test_ch0))
print(np.shape(x_test_ch1))
print(np.shape(y_test))

In [None]:
x_train_cxr = x_train_cxr.reshape(-1, img_size, img_size)
x_train_rgb_cxr = np.stack((x_train_cxr,) * 3, axis=-1)
x_train_ch0 = x_train_ch0.reshape(-1, img_size, img_size)
x_train_rgb_ch0 = np.stack((x_train_ch0,) * 3, axis=-1)
x_train_ch1 = x_train_ch1.reshape(-1, img_size, img_size)
x_train_rgb_ch1 = np.stack((x_train_ch1,) * 3, axis=-1)

x_val_cxr = x_val_cxr.reshape(-1, img_size, img_size)
x_val_rgb_cxr = np.stack((x_val_cxr,) * 3, axis=-1)
x_val_ch0 = x_val_ch0.reshape(-1, img_size, img_size)
x_val_rgb_ch0 = np.stack((x_val_ch0,) * 3, axis=-1)
x_val_ch1 = x_val_ch1.reshape(-1, img_size, img_size)
x_val_rgb_ch1 = np.stack((x_val_ch1,) * 3, axis=-1)

x_test_cxr = x_test_cxr.reshape(-1, img_size, img_size)
x_test_rgb_cxr = np.stack((x_test_cxr,) * 3, axis=-1)
x_test_ch0 = x_test_ch0.reshape(-1, img_size, img_size)
x_test_rgb_ch0 = np.stack((x_test_ch0,) * 3, axis=-1)
x_test_ch1 = x_test_ch1.reshape(-1, img_size, img_size)
x_test_rgb_ch1 = np.stack((x_test_ch1,) * 3, axis=-1)

print(np.shape(x_train_rgb_cxr))
print(np.shape(x_train_rgb_ch0))
print(np.shape(x_train_rgb_ch1))
print(np.shape(y_train))
print(np.shape(x_val_rgb_cxr))
print(np.shape(x_val_rgb_ch0))
print(np.shape(x_val_rgb_ch1))
print(np.shape(y_val))
print(np.shape(x_test_rgb_cxr))
print(np.shape(x_test_rgb_ch0))
print(np.shape(x_test_rgb_ch1))
print(np.shape(y_test))

In [None]:
idx = 9
plt.imshow(x_train_rgb_cxr[idx])
plt.show()
plt.imshow(x_train_rgb_ch0[idx], cmap='gray')
plt.show()
plt.imshow(x_train_rgb_ch1[idx], cmap='gray')
plt.show()

In [None]:
classes = np.unique(y_train)
class_weights = compute_class_weight(class_weight="balanced", classes=classes, y=y_train)

class_weight_dict = {cls: weight for cls, weight in zip(classes, class_weights)}

print(class_weight_dict)

In [None]:
y_train = to_categorical(y_train, num_classes=2)
y_val = to_categorical(y_val, num_classes=2)

print(np.shape(x_train_cxr))
print(np.shape(x_train_ch0))
print(np.shape(x_train_ch1))
print(np.shape(y_train))
print(np.shape(x_val_cxr))
print(np.shape(x_val_ch0))
print(np.shape(x_val_ch1))
print(np.shape(y_val))
print(np.shape(x_test_cxr))
print(np.shape(x_test_ch0))
print(np.shape(x_test_ch1))
print(np.shape(y_test))

# **VGG16**

In [None]:
# base_model = VGG16(weights='imagenet', include_top=True, input_shape=(128, 128, 3))
# base_model.summary()
# for i, layer in enumerate(base_model.layers):
#    print(i, layer.name, layer.trainable)

## **CXR**

In [None]:
random_seed(42)

base_model = VGG16(weights='imagenet', include_top=False, input_shape=(128, 128, 3))
base_model.trainable = False

x = Flatten()(base_model.output)

x = Dense(256, activation='relu')(x)
x = BatchNormalization()(x)
x = Dropout(0.5)(x)
x = Dense(128, activation='relu')(x)
x = BatchNormalization()(x)
x = Dropout(0.5)(x)
x = Dense(2, activation='softmax')(x)

model = Model(inputs=base_model.input, outputs=x)

model.compile(optimizer=Adam(learning_rate=4e-6),
              loss='categorical_crossentropy',
              metrics=['accuracy'])

# model.summary()

early_stopping = EarlyStopping(
    monitor="val_accuracy",
    patience=10, 
    restore_best_weights=True,
    verbose=True
)

history_1 = model.fit(
    x=x_train_rgb_cxr,
    y=y_train,
    validation_data=(x_val_rgb_cxr, y_val),
    batch_size=32,
    epochs=50,
    class_weight=class_weight_dict,
    callbacks=[early_stopping]
)

for layer in base_model.layers: 
    layer.trainable = True

# model.summary()

model.compile(optimizer=Adam(learning_rate=1e-6),
              loss='binary_crossentropy',
              metrics=['accuracy'])

early_stopping = EarlyStopping(
    monitor="val_accuracy",
    patience=10, 
    restore_best_weights=True,
    verbose=True
)

history_2 = model.fit(
    x=x_train_rgb_cxr,
    y=y_train,
    validation_data=(x_val_rgb_cxr, y_val),
    batch_size=32,
    epochs=50,
    class_weight=class_weight_dict,
    callbacks=[early_stopping]
)

y_pred = model.predict(x_test_rgb_cxr, verbose=False)
y_pred = np.argmax(y_pred, axis=1).reshape(-1)

print('accuracy = {}'.format(accuracy_score(y_test, y_pred)))
print('precision = {}'.format(precision_score(y_test, y_pred)))
print('FDR = {}'.format(fdr(y_test, y_pred)))
print('recall = {}'.format(recall_score(y_test, y_pred)))
print('FNR = {}'.format(fnr(y_test, y_pred)))
print('specificity = {}'.format(specificity(y_test, y_pred)))
print('NPV = {}'.format(npv(y_test, y_pred)))
print('f1-score = {}'.format(f1_score(y_test, y_pred)))
print('AUC = {}'.format(roc_auc_score(y_test, y_pred)))
print('MCC = {}'.format(matthews_corrcoef(y_test, y_pred)))

## **CH0**

In [None]:
random_seed(42)

base_model = VGG16(weights='imagenet', include_top=False, input_shape=(128, 128, 3))
base_model.trainable = False

x = Flatten()(base_model.output)

x = Dense(256, activation='relu')(x)
x = BatchNormalization()(x)
x = Dropout(0.5)(x)
x = Dense(128, activation='relu')(x)
x = BatchNormalization()(x)
x = Dropout(0.5)(x)
x = Dense(2, activation='softmax')(x)

model = Model(inputs=base_model.input, outputs=x)

model.compile(optimizer=Adam(learning_rate=4e-6),
              loss='categorical_crossentropy',
              metrics=['accuracy'])

# model.summary()

early_stopping = EarlyStopping(
    monitor="val_accuracy",
    patience=10, 
    restore_best_weights=True,
    verbose=True
)

history_1 = model.fit(
    x=x_train_rgb_ch0,
    y=y_train,
    validation_data=(x_val_rgb_ch0, y_val),
    batch_size=32,
    epochs=50,
    class_weight=class_weight_dict,
    callbacks=[early_stopping]
)

for layer in base_model.layers: 
    layer.trainable = True

# model.summary()

model.compile(optimizer=Adam(learning_rate=1e-6),
              loss='binary_crossentropy',
              metrics=['accuracy'])

early_stopping = EarlyStopping(
    monitor="val_accuracy",
    patience=10, 
    restore_best_weights=True,
    verbose=True
)

history_2 = model.fit(
    x=x_train_rgb_ch0,
    y=y_train,
    validation_data=(x_val_rgb_ch0, y_val),
    batch_size=32,
    epochs=50,
    class_weight=class_weight_dict,
    callbacks=[early_stopping]
)

y_pred = model.predict(x_test_rgb_ch0, verbose=False)
y_pred = np.argmax(y_pred, axis=1).reshape(-1)

print('accuracy = {}'.format(accuracy_score(y_test, y_pred)))
print('precision = {}'.format(precision_score(y_test, y_pred)))
print('FDR = {}'.format(fdr(y_test, y_pred)))
print('recall = {}'.format(recall_score(y_test, y_pred)))
print('FNR = {}'.format(fnr(y_test, y_pred)))
print('specificity = {}'.format(specificity(y_test, y_pred)))
print('NPV = {}'.format(npv(y_test, y_pred)))
print('f1-score = {}'.format(f1_score(y_test, y_pred)))
print('AUC = {}'.format(roc_auc_score(y_test, y_pred)))
print('MCC = {}'.format(matthews_corrcoef(y_test, y_pred)))

## **CH1**

In [None]:
random_seed(42)

base_model = VGG16(weights='imagenet', include_top=False, input_shape=(128, 128, 3))
base_model.trainable = False

x = Flatten()(base_model.output)

x = Dense(256, activation='relu')(x)
x = BatchNormalization()(x)
x = Dropout(0.5)(x)
x = Dense(128, activation='relu')(x)
x = BatchNormalization()(x)
x = Dropout(0.5)(x)
x = Dense(2, activation='softmax')(x)

model = Model(inputs=base_model.input, outputs=x)

model.compile(optimizer=Adam(learning_rate=4e-6),
              loss='categorical_crossentropy',
              metrics=['accuracy'])

# model.summary()

early_stopping = EarlyStopping(
    monitor="val_accuracy",
    patience=10, 
    restore_best_weights=True,
    verbose=True
)

history_1 = model.fit(
    x=x_train_rgb_ch1,
    y=y_train,
    validation_data=(x_val_rgb_ch1, y_val),
    batch_size=32,
    epochs=50,
    class_weight=class_weight_dict,
    callbacks=[early_stopping]
)

for layer in base_model.layers: 
    layer.trainable = True

# model.summary()

model.compile(optimizer=Adam(learning_rate=1e-6),
              loss='binary_crossentropy',
              metrics=['accuracy'])

early_stopping = EarlyStopping(
    monitor="val_accuracy",
    patience=10, 
    restore_best_weights=True,
    verbose=True
)

history_2 = model.fit(
    x=x_train_rgb_ch1,
    y=y_train,
    validation_data=(x_val_rgb_ch1, y_val),
    batch_size=32,
    epochs=50,
    class_weight=class_weight_dict,
    callbacks=[early_stopping]
)

y_pred = model.predict(x_test_rgb_ch1, verbose=False)
y_pred = np.argmax(y_pred, axis=1).reshape(-1)

print('accuracy = {}'.format(accuracy_score(y_test, y_pred)))
print('precision = {}'.format(precision_score(y_test, y_pred)))
print('FDR = {}'.format(fdr(y_test, y_pred)))
print('recall = {}'.format(recall_score(y_test, y_pred)))
print('FNR = {}'.format(fnr(y_test, y_pred)))
print('specificity = {}'.format(specificity(y_test, y_pred)))
print('NPV = {}'.format(npv(y_test, y_pred)))
print('f1-score = {}'.format(f1_score(y_test, y_pred)))
print('AUC = {}'.format(roc_auc_score(y_test, y_pred)))
print('MCC = {}'.format(matthews_corrcoef(y_test, y_pred)))

In [None]:
# model.save('/kaggle/working/model.h5')

# with open('history_1.pkl', 'wb') as f:
#     pickle.dump(history_1.history, f)

# with open('history_2.pkl', 'wb') as f:
#     pickle.dump(history_2.history, f)

# with zipfile.ZipFile('model.zip', 'w') as zipf:
#     zipf.write('model.h5')
#     zipf.write('history_1.pkl')
#     zipf.write('history_2.pkl')

In [None]:
# !rm -rf /kaggle/working/*