In [None]:
import keras
import tensorflow as tf
from keras import layers
import matplotlib.pyplot as plt
from keras.preprocessing import image, image_dataset_from_directory
from keras.models import Sequential, Model
from keras.optimizers import Adam
from focal_loss import BinaryFocalLoss
from keras.layers import Dense, MaxPooling2D, Dropout, Flatten
from keras.preprocessing.image import ImageDataGenerator, load_img
from keras.callbacks import CSVLogger
from keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau, CSVLogger
from tensorflow.keras.utils import to_categorical
import numpy as np
from sklearn.utils import shuffle, class_weight
from sklearn.metrics import classification_report, confusion_matrix
from focal_loss import BinaryFocalLoss
from tensorflow_addons.losses import SigmoidFocalCrossEntropy
import pandas as pd
import os

In [None]:
data=pd.read_csv('/home/jupyter-zaiman/data/DataCenter/CheXpert-v1.0/train.csv')
data = data.filter(["Path", "No Finding"], axis=1)
data = data.fillna(0)
print("Before Undersampling...")
abnormal, normal = data["No Finding"].value_counts()
print("Abnormal: " + str(abnormal))
print("Normal: " + str(normal))
# Divide by class 0 = abnormal , 1 = nomral
df_class_0 = data[data['No Finding'] == 0]
df_class_0 = df_class_0.replace(0, 1)
df_class_1 = data[data['No Finding'] == 1]
df_class_1 = df_class_1.replace(1, 0)
#abnormal = 1, normal = 0
df_class_0_under = df_class_0.sample(normal, random_state=3)
data = pd.concat([df_class_0_under, df_class_1], axis=0)
data["No Finding"] = data["No Finding"].astype(str)
print('After Undersampling:')
print(data["No Finding"].value_counts())
data.head(30000)

In [None]:
from sklearn.model_selection import train_test_split

train, test = train_test_split(data, test_size=0.4, random_state=1)
valid, test = train_test_split(test, test_size=.5, random_state=1)


In [None]:
#central crop on 224 x 224
IMAGE_WIDTH, IMAGE_HEIGHT = (224, 224)
EPOCHS = 50
BATCH_SIZE = 128
learning = 0.001
image_shape = (IMAGE_HEIGHT, IMAGE_WIDTH, 3)

In [None]:
#zero mean to resize -1 to 1 
train_datagen = ImageDataGenerator(
        rescale=1./255,
    )
valid_datagen = ImageDataGenerator(
        rescale=1./255
    )

train_generator=train_datagen.flow_from_dataframe(
    dataframe=train, 
    directory='/home/jupyter-zaiman/data/DataCenter/',
    x_col="Path", y_col="No Finding", 
    class_mode="binary", 
    target_size=(IMAGE_HEIGHT, IMAGE_WIDTH), 
    batch_size=BATCH_SIZE)
valid_generator=valid_datagen.flow_from_dataframe(dataframe=valid, directory='/home/jupyter-zaiman/data/DataCenter/', x_col="Path", y_col="No Finding", class_mode="binary", target_size=(IMAGE_HEIGHT, IMAGE_WIDTH), shuffle = True, batch_size=BATCH_SIZE)
test_generator=valid_datagen.flow_from_dataframe(dataframe=test, directory='/home/jupyter-zaiman/data/DataCenter/', x_col="Path", y_col="No Finding", class_mode="binary", target_size=(IMAGE_HEIGHT, IMAGE_WIDTH), shuffle = False, batch_size=BATCH_SIZE)

In [None]:
#create model
base = tf.keras.applications.ResNet50(
    include_top=False,
    weights="imagenet",
    pooling='max',
)
   
for layer in base.layers:
    layer.trainable = False
    
for layer in [l for l in base.layers if 'conv5' in l.name]:
    layer.trainable = True

x = base.output
x = Dense(512, activation = 'relu')(x)
prediction = Dense(1, activation='sigmoid')(x)
model = Model(inputs=base.input, outputs=prediction)
model.summary()

In [None]:
model.compile(optimizer=Adam(lr=learning), loss='binary_crossentropy', metrics=['accuracy'])

save = ModelCheckpoint(
    '/home/jupyter-zaiman/COVID-19 Classification/ResNet50-UnderSampling/', 
    monitor='val_accuracy', 
    save_best_only=True,
    save_weights_only=True, 
    mode='max',
    verbose=1
)
stop = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=6)
scheduler = ReduceLROnPlateau(
    monitor="val_loss",
    factor=0.01,
    patience=3,
    verbose=1
    )
logger = CSVLogger('/home/jupyter-zaiman/COVID-19 Classification/ResNet50-UnderSampling/training.log')
history = model.fit(train_generator, 
    epochs=EPOCHS, 
    validation_data=valid_generator, 
    verbose=1, 
    callbacks=[save, scheduler, stop, logger]
)
model.save('ResNet50_Undersampling.h5')


In [None]:
import seaborn as sns
print(history.history.keys())
# summarize history for accuracy
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='lower right')
plt.show()
# summarize history for loss
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='lower right')
plt.show()
    
Y_pred = model.predict(test_generator, len(test_generator.filenames)) 
labels = (Y_pred >= 0.5).astype(np.int)
print('Confusion Matrix')
cm = confusion_matrix(test_generator.classes, labels)
plt.figure(figsize=(6,6))
sns.heatmap(cm, annot=True, fmt="d");
target_names = ['Normal', 'Abnormal']
print('Classification Report')
print(classification_report(test_generator.classes, labels, target_names=target_names)) 

In [None]:
from sklearn.metrics import roc_curve

fpr, tpr, thresholds = roc_curve(test_generator.classes, Y_pred)

def plot_roc_curve(fpr, tpr, label=None):
    plt.plot(fpr, tpr, linewidth=2, label=label)
    plt.plot([0, 1], [0, 1], 'k--')
    plt.axis([0, 1, 0, 1])
    plt.title("ROC Curve")
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')

plot_roc_curve(fpr, tpr)
plt.show()

In [None]:
from sklearn.metrics import roc_auc_score
auc = roc_auc_score(test_generator.classes, labels)
print(auc)