<a href="https://colab.research.google.com/github/hyesukim1/chest_X_ray_images_binary_classification/blob/main/GoogLeNet_chest_X_ray_image_binary_classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Kaggle API로 연결하여 데이터 로드


In [None]:
!pip install kaggle
from google.colab import files
files.upload()

!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/

!chmod 600 ~/.kaggle/kaggle.json

In [None]:
!kaggle datasets download -d paultimothymooney/chest-xray-pneumonia

In [None]:
!ls

In [None]:
!unzip -qq "/content/chest-xray-pneumonia.zip"

---

In [None]:
data_path = '/content/chest_xray/'

train_path = data_path + 'train'
valid_path = data_path + 'val'
test_path = data_path + 'test'

In [None]:
from glob import glob

print(f'number of train data: {len(glob(train_path + "/*/*"))}')
print(f'number of validation data: {len(glob(valid_path + "/*/*"))}')
print(f'number of test data: {len(glob(test_path + "/*/*"))}')

In [None]:
from tqdm import tqdm
import pandas as pd
import numpy as np
import os
import random

os.environ['PYTHONHASHSEED'] = '73'

seed = 73
random.seed(seed)
np.random.seed(seed)

img_map = []

def prepareData(Dir, start):
  category = ["NORMAL", "PNEUMONIA"]
  for category in category:
    path = os.path.join(Dir, category)
    class_num = category.index(category)

    for img in tqdm(os.listdir(path)):
      img_path = os.path.join(path, img)
      img_map.append({'path':img_path, 'label':category})

prepareData(train_path, 'train')
prepareData(valid_path, 'val')
prepareData(test_path, 'test')

img_map = pd.DataFrame(img_map).sample(frac=1, random_state=seed)

In [None]:
img_map.shape

In [None]:
from sklearn.model_selection import StratifiedShuffleSplit

features = img_map['path'].to_numpy()
labels = img_map['label'].to_numpy()

stratified_sample = StratifiedShuffleSplit(n_splits=2, test_size=0.3, random_state=73)


In [None]:
for train_index, test_index in stratified_sample.split(features, labels):
  X_train, test_X = features[train_index], features[test_index]
  y_train, test_y = labels[train_index], labels[test_index]

half_size = np.int(len(test_X)/2)
X_test, y_test = test_X[0:half_size], test_y[0:half_size]
X_val, y_val = test_X[half_size:], test_y[half_size:]

In [None]:
train_map = pd.DataFrame()
train_map['path'], train_map['label'] = X_train, y_train

test_map = pd.DataFrame()
test_map['path'], test_map['label'] = X_test, y_test

val_map = pd.DataFrame()
val_map['path'], val_map['label'] = X_val, y_val

In [None]:
# data summary
print('> {} train size'.format(X_train.shape[0]))
print('> {} test size'.format(X_test.shape[0]))
print('> {} val size'.format(X_val.shape[0]))

In [None]:
import cv2
import time
import imageio

ColorCh = 3
IMG_SIZE = 224
input_shape=(IMG_SIZE, IMG_SIZE, ColorCh)

classes = ("NORMAL", "PNEMONIA")
CATEGORIES = sorted(classes)

print('classes:', CATEGORIES)

In [None]:
from keras.preprocessing.image import ImageDataGenerator, img_to_array, load_img

datagen = ImageDataGenerator(rescale = 1./255,
                             horizontal_flip=True,
                             brightness_range=[1.0, 1.3],
                             rotation_range=15
                             )

In [None]:
batch_size = 64

def get_generator(frame_):
  generator = datagen.flow_from_dataframe(
      dataframe=frame_,
      x_col = 'path',
      y_col = 'label',
      batch_size=batch_size,
      seed=seed,
      shuffle = False,
      class_mode='sparse',
      color_mode='rgb',
      save_format='jpeg',
      target_size=(IMG_SIZE, IMG_SIZE)
  )
  return generator

In [None]:
def getLabelCount(frame):
    label_count = pd.Series(frame['label'].values.ravel()).value_counts()
    n_classes = (label_count)
    return label_count

In [None]:
train_df = train_map.sample(frac=1, random_state=seed)
train_generator = get_generator(train_df)

print('훈련 셋의 라벨 갯수')
getLabelCount(train_df)

In [None]:
val_df = val_map.sample(frac=1, random_state=seed)
val_generator = get_generator(val_df)

print('검증 셋의 라벨 갯수')
getLabelCount(val_df)

In [None]:
test_df = test_map.sample(frac=1, random_state=seed)
test_generator = get_generator(test_df)

print('테스트 셋의 라벨 갯수')
getLabelCount(test_df)

---

# Building Models

In [None]:
from keras.backend import separable_conv2d
import keras
import tensorflow as tf
from tensorflow.keras.layers import Dense, Dropout, Activation, Flatten, Add, add
from tensorflow.keras.layers import InputLayer, Input, Conv2D, MaxPooling2D, AveragePooling2D, GlobalAveragePooling2D
from tensorflow.keras.layers import Activation, MaxPool2D, ZeroPadding2D, SeparableConv2D
from keras.layers.normalization import batch_normalization
from tensorflow.keras.models import Model, Sequential
from keras import regularizers

kernel_regularizer = regularizers.l2(0.0001)

final_activation = 'softmax'
entropy = 'sparse_categorical_crossentropy'
n_classes = len(CATEGORIES)
print(n_classes)

In [None]:
def FCLayers(baseModel):
  baseModel.trainable = True
  headModel = baseModel.output
  headModel = Dropout(0.5, seed=73)(headModel)
  headModel = Dense(n_classes, activtion=final_activation)(headModel)
  model = Model(inputs = baseModel.input, outputs = headModel)
  return model


## GoogLenet 모델

In [None]:
# Inception Block 만들기

from keras.layers.merge import concatenate

def Inception_block(input_layer, f1, f2, f3, f4):    
    
    path1 = Conv2D(filters=f1, kernel_size = (1,1), padding = 'same', activation = 'relu')(input_layer)
    
    path2 = Conv2D(filters = f2[0], kernel_size = (1,1), 
                   padding = 'same', activation = 'relu')(input_layer)
    
    path2 = Conv2D(filters = f2[1], kernel_size = (3,3), 
                   padding = 'same', activation = 'relu')(path2)

    path3 = Conv2D(filters = f3[0], kernel_size = (1,1), 
                   padding = 'same', activation = 'relu')(input_layer)
    
    path3 = Conv2D(filters = f3[1], kernel_size = (5,5), 
                   padding = 'same', activation = 'relu')(path3)

    path4 = MaxPooling2D((3,3), strides= (1,1), 
                         padding = 'same')(input_layer)
    
    path4 = Conv2D(filters = f4, kernel_size = (1,1), 
                   padding = 'same', activation = 'relu')(path4)
    
    output_layer = concatenate([path1, path2, path3, path4], axis = -1)

    return output_layer

In [None]:
def Extra_network_2(X):
  X2 = AveragePooling2D(pool_size = (5,5), strides = 3)(X)
  X2 = Conv2D(filters =128, kernel_size = (1,1), padding = 'same', activation = 'relu')(X2)

  X2 = Flatten()(X2)
  X2 = Dense(1024, activation=final_activation, name = 'output2')(X2)
  return X2

def Extra_network_1(X):
  X1 = AveragePooling2D(pool_size = (5, 5), strides =3)(X)
  X1 = Conv2D(filters = 128, kernel_size = (1, 1), padding = 'same', activation = 'relu')(X1)

  X1 = Flatten()(X1)
  X1 = Dense(1024, activation = 'relu')(X1)
  X1 = Dense(n_classes, activation = final_activation, name='output1')(X1)
  return X1

In [None]:
def layer_4(X):
  X = Inception_block(X, 192, (96, 208), (16, 48), 128)

  X1 = Extra_network_1(X)

  X = Inception_block(X, 160, (112, 224), (24, 64), 64)
  X = Inception_block(X, 128, (128, 256), (24, 64), 64)
  X = Inception_block(X, 112, (144, 288), (32, 64), 64)

  X2 = Extra_network_2(X)

  X = Inception_block(X, 256, (160, 320), (32, 128), 128)
  X = MaxPooling2D(pool_size = 3, strides = 2)(X)

  return X, X1, X2

def layer_3(X):
  X = Inception_block(X, 64, (96, 128), (16, 32), 32)
  X = Inception_block(X, 128, (128, 192), (32, 96), 64)
  X = MaxPooling2D(pool_size=(3,3), strides=2)(X)

  return X

def layer_2(X):
  X = Conv2D(filters=64, kernel_size=1, strides = 1, padding = 'same', activation = 'relu')(X)
  X = Conv2D(filters = 192, kernel_size = 3, padding = 'same', activation='relu')(X)
  X = MaxPooling2D(pool_size=3, strides=2)(X)

  return X

In [None]:
def load_GoogLeNet():
  input_layer = Input(shape=input_shape)

  X = Conv2D(64, kernel_size=7, strides=2, padding='valid', activation='relu')(input_layer)

  X = MaxPooling2D(pool_size =3, strides=2)(X)

  X = layer_2(X)
  X = layer_3(X)
  X, X1, X2 = layer_4(X)

  X = Inception_block(X, 256, (160, 320), (32, 128), 128)
  X = Inception_block(X, 384, (192, 384), (48, 128), 128)

  X = GlobalAveragePooling2D()(X)
  X = Dropout(0.6)(X)

  X = Dense(n_classes, activation=final_activation, name='output3')(X)

  model = Model(input_layer, [X, X1, X2], name = 'GoogLeNet')

  return model

load_GoogLeNet().summary()

In [None]:
from tensorflow.keras.callbacks import Callback, ModelCheckpoint, LearningRateScheduler, TensorBoard, EarlyStopping, ReduceLROnPlateau

In [None]:
# EPOCHS = 1
EPOCHS = 120
patience = 3

start_lr = 0.00001
min_lr = 0.00001
max_lr = 0.00005

rampup_epochs = 5
sustain_epochs = 0
exp_decay = .8
        
def lrfn(epoch):
    if epoch < rampup_epochs:
        return (max_lr - start_lr)/rampup_epochs * epoch + start_lr
    elif epoch < rampup_epochs + sustain_epochs:
        return max_lr
    else:
        return (max_lr - min_lr) * exp_decay**(epoch-rampup_epochs-sustain_epochs) + min_lr
   
def getCallbacks(name):
    DESIRED_ACCURACY = 0.90
    class myCallback(tf.keras.callbacks.Callback):
        def on_epoch_end(self, epoch, logs={}):
            if (logs.get('accuracy')is not None and logs.get('accuracy') >= DESIRED_ACCURACY):
                print("\nLimits Reached cancelling training!")
                self.model.stop_training = True

            
    end_callback = myCallback()

    lr_plat = ReduceLROnPlateau(patience = 2, mode = 'min')

    lr_callback = LearningRateScheduler(lambda epoch: lrfn(epoch), verbose=False)

    early_stopping = EarlyStopping(patience = patience, monitor='val_loss',
                                 mode='min', restore_best_weights=True, 
                                 verbose = 1, min_delta = .00075)


    checkpoint_filepath = name + '_Weights.h5'

    model_checkpoints = ModelCheckpoint(filepath=checkpoint_filepath,
                                        save_weights_only=True,
                                        monitor='val_loss',
                                        mode='min',
                                        verbose = 1,
                                        save_best_only=True)

    import datetime
    log_dir="logs/fit/" + '_' + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")  
    tensorboard_callback = TensorBoard(log_dir = log_dir, write_graph=True, histogram_freq=1)

    return [end_callback, 
             lr_callback, 
             model_checkpoints,
             early_stopping,
             #tensorboard_callback,
             lr_plat
            ]

GoogLeNet_callbacks = getCallbacks('GoogLeNet')

In [None]:
def CompileModel(name, model):
    if name == 'GoogLeNet':
        model.compile(optimizer='adam', loss=entropy, metrics=["accuracy"])
    return model

def FitModel(model, name):
    if name == 'GoogLeNet':
        callbacks_ = GoogLeNet_callbacks
    history = model.fit(train_generator, 
                        epochs=EPOCHS,
                        callbacks=callbacks_,
                        validation_data = val_generator,
                        steps_per_epoch=(len(train_generator.labels) / 80),
                        validation_steps=(len(val_generator.labels) / 80),
                       )
    
    print(history.history.keys())

    model.load_weights(name + '_Weights.h5')

    final_accuracy_avg = np.mean(history.history['val_output3_accuracy'][-4:])

    final_loss = history.history['val_loss'][-1]
  
    group = {(history): 'history', (model): 'model', (final_accuracy_avg):'acc', (final_loss): 'loss'}

    print('\n')
    print('---'*15)
    print(name,' Model')
    print('Total Epochs :', len(history.history['loss']))    
    print('Restoring best Weights')
    
    index = (len(history.history['loss']) - (patience + 1))
    print('---'*15)
    print('Best Epoch :', index)
    print('---'*15)
    
    train_accuracy = history.history['output3_accuracy']
    train_loss = history.history['loss']
    
    val_accuracy = history.history['val_output3_accuracy']
    val_loss = history.history['val_loss']

    print('Accuracy on train:', train_accuracy,
          '\tLoss on train:', train_loss)
    
    print('Accuracy on val:', val_accuracy ,
          '\tLoss on val:', val_loss)
    print('---'*15)

    return model, history

In [None]:
def BuildModel(name):
    if name == 'GoogLeNet':
        prepared_model = load_GoogLeNet() 
        
    compiled_model = CompileModel(name, prepared_model)
    return compiled_model

In [None]:
g_compiled_model = BuildModel('GoogLeNet')
g_model, g_history = FitModel(g_compiled_model, 'GoogLeNet')

# Model Evaluation on the Testset

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline
def print_graph(item, index, history):
    plt.figure()
    train_values = history.history[item][0:index]
    plt.plot(train_values)
    test_values = history.history['val_' + item][0:index]
    plt.plot(test_values)
    plt.legend(['training','validation'])
    plt.title('Training and validation '+ item)
    plt.xlabel('epoch')
    plt.show()
    plot = '{}.png'.format(item)
    plt.savefig(plot)

In [None]:
import seaborn as sns
from sklearn import metrics
from sklearn.metrics import roc_curve, roc_auc_score, plot_roc_curve, accuracy_score, classification_report, confusion_matrix

def test_set_results(pred_value, n=1):    
    y_test = test_generator.labels
    X_test, _ = test_generator.next()
    
    corr_pred = metrics.confusion_matrix(y_test, pred_value)
    fig=plt.figure(figsize=(10, 8))
    ax = plt.axes()
    
    sns.heatmap(corr_pred,annot=True, fmt="d",cmap="Purples", xticklabels=CATEGORIES, yticklabels=CATEGORIES)
    ax.set_title('Dense Output {}'.format(n))
    plt.show()
    
    n_correct = np.int(corr_pred[0][0] + corr_pred[1][1])
    print('...'*15)

    print('> Correct Predictions:', n_correct)
    
    n_wrongs = len(y_test) - n_correct
    print('> Wrong Predictions:', n_wrongs)
    print('...'*15)
    
    print(classification_report(test_generator.labels, pred_value, target_names=CATEGORIES))

In [None]:
def printResults(name, model):
    predictions = model.predict(test_generator, verbose=1)
    preds = np.argmax(predictions, axis=1)
    test_set_results(preds)

In [None]:
def model_summary(model, history, name):
    index = (len(history.history['loss']) - (patience + 1))
    print('Best Epochs: ', index)
    
    if name == 'GoogLeNet':
        results = model.evaluate(test_generator, verbose=1)
        loss, output3_loss, output1_loss, output2_loss, output3_accuracy, output1_accuracy, output2_accuracy = results
        
        for i in range(3):
            n = i + 1
            out_layer = 'Output Layer {}'.format(n)
            
            if n == 1:
                test_accuracy = output1_accuracy
                test_loss = output1_loss

            if n == 2:
                test_accuracy = output2_accuracy
                test_loss = output2_loss
                
            if n == 3:
                test_accuracy = output3_accuracy
                test_loss = output3_loss
                
                
            output_name = 'output{}_'.format(n)
            train_accuracy, train_loss = history.history[output_name + 'accuracy'][index], history.history[output_name + 'loss'][index]
            
  
            print_graph(output_name + 'loss', index, history)
            print_graph(output_name + 'accuracy', index, history)
        
            print('---'*15)  
            print('GoogLeNet Dense output {}:'.format(n))
            
            print('> Accuracy on train :'.format(out_layer), train_accuracy, 
                  '\tLoss on train:',train_loss)
        
            print('> Accuracy on test :'.format(out_layer), test_accuracy,
                  '\tLoss on test:',test_loss)
            
            print('---'*15)
            print('> predicting test')
            print('---'*15)
            
            predictions = model.predict(test_generator, verbose=1)
            preds = np.argmax(predictions[i], axis=1)
            test_set_results(preds, n)
                
    else:
        test_loss, test_accuracy = model.evaluate(test_generator, verbose=1)
        
        train_accuracy = history.history['accuracy'][index]
        train_loss = history.history['loss'][index]

        print_graph('loss', index, history)
        print_graph('accuracy', index, history)
        
        print('---'*15) 
        print(name)
        print('> Accuracy on train:',train_accuracy, 
              '\tLoss on train:',train_loss)
        
        print('> Accuracy on test:',test_accuracy,
              '\tLoss on test:',test_loss)
        
        print('---'*15)
        print('> predicting test')
        print('---'*15)
        
        printResults(name, model)

In [None]:
def printResults(name, model):
    predictions = model.predict(test_generator, verbose=1)
    preds = np.argmax(predictions, axis=1)
    test_set_results(preds)

In [None]:
def model_summary(model, history, name):
    index = (len(history.history['loss']) - (patience + 1))
    print('Best Epochs: ', index)
    
    if name == 'GoogLeNet':
        results = model.evaluate(test_generator, verbose=1)
        loss, output3_loss, output1_loss, output2_loss, output3_accuracy, output1_accuracy, output2_accuracy = results
        
        for i in range(3):
            n = i + 1
            out_layer = 'Output Layer {}'.format(n)
            
            if n == 1:
                test_accuracy = output1_accuracy
                test_loss = output1_loss

            if n == 2:
                test_accuracy = output2_accuracy
                test_loss = output2_loss
                
            if n == 3:
                test_accuracy = output3_accuracy
                test_loss = output3_loss
                
                
            output_name = 'output{}_'.format(n)
            train_accuracy, train_loss = history.history[output_name + 'accuracy'][index], history.history[output_name + 'loss'][index]
            
  
            print_graph(output_name + 'loss', index, history)
            print_graph(output_name + 'accuracy', index, history)
        
            print('---'*15)  
            print('GoogLeNet Dense output {}:'.format(n))
            
            print('> Accuracy on train :'.format(out_layer), train_accuracy, 
                  '\tLoss on train:',train_loss)
        
            print('> Accuracy on test :'.format(out_layer), test_accuracy,
                  '\tLoss on test:',test_loss)
            
            print('---'*15)
            print('> predicting test')
            print('---'*15)
            
            predictions = model.predict(test_generator, verbose=1)
            preds = np.argmax(predictions[i], axis=1)
            test_set_results(preds, n)
                
    else:
        test_loss, test_accuracy = model.evaluate(test_generator, verbose=1)
        
        train_accuracy = history.history['accuracy'][index]
        train_loss = history.history['loss'][index]

        print_graph('loss', index, history)
        print_graph('accuracy', index, history)
        
        print('---'*15) 
        print(name)
        print('> Accuracy on train:',train_accuracy, 
              '\tLoss on train:',train_loss)
        
        print('> Accuracy on test:',test_accuracy,
              '\tLoss on test:',test_loss)
        
        print('---'*15)
        print('> predicting test')
        print('---'*15)
        
        printResults(name, model)

In [None]:
model_summary(g_model, g_history, 'GoogLeNet')