In [None]:
import os
import glob
import h5py
import shutil
import imgaug as aug
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib.image as mimg
import imgaug.augmenters as iaa
import tensorflow as tf
from os import listdir, makedirs, getcwd, remove
from os.path import isfile, join, abspath, exists, isdir, expanduser
from PIL import Image
from pathlib import Path
from skimage.io import imread
from skimage.transform import resize
from keras.models import Sequential, Model
from keras.applications.vgg16 import VGG16, preprocess_input
from keras.preprocessing.image import ImageDataGenerator,load_img, img_to_array
from keras.models import Sequential
from keras.layers import Conv2D, MaxPooling2D, Dense, Dropout, Input, Flatten, SeparableConv2D
from keras.layers import GlobalMaxPooling2D
from keras.layers import BatchNormalization
from keras.layers.merge import Concatenate
from keras.models import Model
from tensorflow.keras.optimizers import Adam, SGD, RMSprop
from keras.callbacks import ModelCheckpoint, Callback, EarlyStopping
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from mlxtend.plotting import plot_confusion_matrix
from sklearn.metrics import confusion_matrix
import cv2
from keras import backend as K
color = sns.color_palette()
%matplotlib inline


In [None]:
os.environ['PYTHONHASHSEED'] = '0'
np.random.seed(111)
session_conf = tf.compat.v1.ConfigProto(intra_op_parallelism_threads=1, inter_op_parallelism_threads=1)
tf.compat.v1.set_random_seed(111)
sess = tf.compat.v1.Session(graph=tf.compat.v1.get_default_graph(), config=session_conf)
K.set_session(sess)
aug.seed(111)

In [None]:
data_path = Path('/kaggle/input/chest-xray-pneumonia/chest_xray/chest_xray')

train_path = data_path / 'train'
test_path = data_path / 'test'
val_path = data_path / 'val'

In [None]:
print(train_path)
print(test_path)
print(val_path)

In [None]:
normal_train_path = train_path / 'NORMAL'
pneumonia_train_path = train_path / 'PNEUMONIA'

normal_cases_train = normal_train_path.glob('*.jpeg')
pneumonia_cases_train = pneumonia_train_path.glob('*.jpeg')


train_data = []
i=0
for img in normal_cases_train:
    train_data.append((img, 0))  # 0: label for normal cases
    
i=0
for img in pneumonia_cases_train:
    train_data.append((img, 1)) # 1: label for pneumonia cases
    
train_data = pd.DataFrame(train_data, columns=['image', 'label'], index=None)
train_data = train_data.sample(frac=1).reset_index(drop=True)

train_data.head()

In [None]:
count_cases = train_data['label'].value_counts()
print(count_cases)

plt.figure(figsize=(10,8))
sns.barplot(x=count_cases.index, y=count_cases.values)
plt.title("Number of cases", fontsize=14)
plt.xlabel('Case type', fontsize=12)
plt.ylabel('Count', fontsize=12)
plt.xticks(range(len(count_cases.index)), ['Normal(0)', 'Pneumonia(1)'])
plt.show()

In [None]:
normal_samples = (train_data[train_data['label'] == 0]['image'].iloc[:5]).tolist()
pneumonia_samples = (train_data[train_data['label'] == 1]['image'].iloc[:5]).tolist()

samples = normal_samples + pneumonia_samples
del normal_samples, pneumonia_samples

f, ax = plt.subplots(2, 5, figsize=(30,10))
for i in range(10):
    img = imread(samples[i])
    ax[i//5, i%5].imshow(img, cmap='gray')
    if i<5:
        ax[i//5, i%5].set_title("Normal")
    else:
        ax[i//5, i%5].set_title("Pneumonia")
    ax[i//5, i%5].axis('off')
    ax[i//5, i%5].set_aspect('auto')
plt.show()

In [None]:
import cv2

normal_cases_val_path = val_path / 'NORMAL'
pneumonia_cases_val_path = val_path / 'PNEUMONIA'

normal_cases_val = normal_cases_val_path.glob('*.jpeg')
pneumonia_cases_val = pneumonia_cases_val_path.glob('*.jpeg')

val_data = []
val_labels = []

def convert_grayscale(img):
    img = cv2.resize(img, (224, 224))
    if img.shape[2] == 1:
        img = np.dstack([img, img, img])
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = img.astype(np.float32)/255.
    return img

for img in normal_cases_val:
    img = cv2.imread(str(img))
    img = convert_grayscale(img)
    label = to_categorical(0, num_classes=2)
    val_data.append(img)
    val_labels.append(label)
    
for img in pneumonia_cases_val:
    img = cv2.imread(str(img))
    img = convert_grayscale(img)
    label = to_categorical(1, num_classes=2)
    val_data.append(img)
    val_labels.append(label)
    
val_data = np.array(val_data)
val_labels = np.array(val_labels)

print("Total number of validation examples:", val_data.shape)
print("total number of labels:", val_labels.shape)

In [None]:
import cv2

normal_cases_val_path = val_path / 'NORMAL'
pneumonia_cases_val_path = val_path / 'PNEUMONIA'

normal_cases_val = normal_cases_val_path.glob('*.jpeg')
pneumonia_cases_val = pneumonia_cases_val_path.glob('*.jpeg')



vdl = []
i=0
for img in normal_cases_val:
    vdl.append((img, 0))  # 0: label for normal cases
    
i=0
for img in pneumonia_cases_val:
    vdl.append((img, 1)) # 1: label for pneumonia cases
    
vd = pd.DataFrame(vdl, columns=['image', 'label'], index=None)
vd = vd.sample(frac=1).reset_index(drop=True)


vnormal_samples = (vd[vd['label'] == 0]['image'].iloc[:5]).tolist()
vpneumonia_samples = (vd[vd['label'] == 1]['image'].iloc[:5]).tolist()

vsamples = vnormal_samples + vpneumonia_samples
del vnormal_samples, vpneumonia_samples

f, ax = plt.subplots(2, 5, figsize=(30,10))
for i in range(10):
    img = imread(vsamples[i])
    ax[i//5, i%5].imshow(img, cmap='gray')
    if i<5:
        ax[i//5, i%5].set_title("Normal")
    else:
        ax[i//5, i%5].set_title("Pneumonia")
    ax[i//5, i%5].axis('off')
    ax[i//5, i%5].set_aspect('auto')
plt.show()

In [None]:
seq = iaa.OneOf([
    iaa.Affine(rotate=20),
    iaa.Multiply((1.2, 1.5))])

In [None]:
def data_gen(data, batch_size):
    n = len(data)
    steps = n//batch_size
    
    batch_data = np.zeros((batch_size, 224, 224, 3), dtype=np.float32)
    batch_labels = np.zeros((batch_size, 2), dtype=np.float32)
    
    indices = np.arange(n)
    i = 0
    while True:
        np.random.shuffle(indices)
        count = 0
        next_batch = indices[(i*batch_size):(i+1)*batch_size]
        
        for j, idx in enumerate(next_batch):
            img_name = data.iloc[idx]['image']
            label = data.iloc[idx]['label']
            
            encoded_label = tf.keras.utils.to_categorical(label, num_classes=2)
            img = cv2.imread(str(img_name))
            img = cv2.resize(img, (224,224))
            if img.shape[2] == 1:
                img = np.dstack([img, img, img])
            
            orig_img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            orig_img = img.astype(np.float32)/255.
            
            batch_data[count] = orig_img
            batch_labels[count] = encoded_label
            
            if label == 0 and count < batch_size-2:
                aug_img_1 = seq.augment_image(img)
                aug_img_2 = seq.augment_image(img)
                aug_img_1 = convert_grayscale(aug_img_1)
                aug_img_2 = convert_grayscale(aug_img_2)
                
                batch_data[count+1] = aug_img_1
                batch_labels[count+1] = encoded_label
                batch_data[count+2] = aug_img_2
                batch_labels[count+2] = encoded_label
                count += 2
            else:
                count += 1
                
            if count == batch_size - 1:
                break
        i += 1
        yield batch_data, batch_labels
        
        if i >= steps:
            i = 0

In [None]:
def build_model():
    input_img = Input(shape=(224,224,3), name='ImageInput')
    x = Conv2D(64, (3,3), activation='relu', padding='same', name='Conv1_1')(input_img)
    x = Conv2D(64, (3,3), activation='relu', padding='same', name='Conv1_2')(x)
    x = MaxPooling2D((2,2), name='pool1')(x)
    
    x = SeparableConv2D(128, (3,3), activation='relu', padding='same', name='Conv2_1')(x)
    x = SeparableConv2D(128, (3,3), activation='relu', padding='same', name='Conv2_2')(x)
    x = MaxPooling2D((2,2), name='pool2')(x)
    
    x = SeparableConv2D(256, (3,3), activation='relu', padding='same', name='Conv3_1')(x)
    x = BatchNormalization(name='bn1')(x)
    x = SeparableConv2D(256, (3,3), activation='relu', padding='same', name='Conv3_2')(x)
    x = BatchNormalization(name='bn2')(x)
    x = SeparableConv2D(256, (3,3), activation='relu', padding='same', name='Conv3_3')(x)
    x = MaxPooling2D((2,2), name='pool3')(x)
    
    x = SeparableConv2D(512, (3,3), activation='relu', padding='same', name='Conv4_1')(x)
    x = BatchNormalization(name='bn3')(x)
    x = SeparableConv2D(512, (3,3), activation='relu', padding='same', name='Conv4_2')(x)
    x = BatchNormalization(name='bn4')(x)
    x = SeparableConv2D(512, (3,3), activation='relu', padding='same', name='Conv4_3')(x)
    x = MaxPooling2D((2,2), name='pool4')(x)
    
    x = Flatten(name='flatten')(x)
    x = Dense(1024, activation='relu', name='fc1')(x)
    x = Dropout(0.7, name='dropout1')(x)
    x = Dense(512, activation='relu', name='fc2')(x)
    x = Dropout(0.5, name='dropout2')(x)
    x = Dense(2, activation='softmax', name='fc3')(x)
    
    model = Model(inputs=input_img, outputs=x)
    return model

In [None]:
def build_model_2():
    model = tf.keras.Sequential([
        tf.keras.Input(shape=(224, 224, 3)),
        
        tf.keras.layers.Conv2D(16, 3, activation='relu', padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Conv2D(16, 3, activation='relu', padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.MaxPool2D(),
        
        tf.keras.layers.Dropout(0.1),
        
        tf.keras.layers.SeparableConv2D(32, 3, activation='relu', padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.SeparableConv2D(32, 3, activation='relu', padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.MaxPool2D(),
        
        tf.keras.layers.Dropout(0.3),

#         tf.keras.layers.SeparableConv2D(64, 3, activation='relu', padding='same'),
#         tf.keras.layers.BatchNormalization(),
#         tf.keras.layers.SeparableConv2D(64, 3, activation='relu', padding='same'),
#         tf.keras.layers.BatchNormalization(),
#         tf.keras.layers.MaxPool2D(),
        
#         tf.keras.layers.Dropout(0.3),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dropout(0.7),

        tf.keras.layers.Dense(32, activation='relu'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Dropout(0.5),

        tf.keras.layers.Dense(16, activation='relu'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Dropout(0.3),

        tf.keras.layers.Dense(2, activation='softmax')
    ])
    
    return model


In [None]:
def build_model_3():
    model = tf.keras.Sequential([
        tf.keras.Input(shape=(224, 224, 3)),
        
        tf.keras.layers.Conv2D(32, 3, activation='relu', padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.MaxPool2D(),
        tf.keras.layers.Conv2D(64, 3, activation='relu', padding='same'),
        tf.keras.layers.Dropout(0.1),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.MaxPool2D(),
        tf.keras.layers.Conv2D(64, 3, activation='relu', padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.MaxPool2D(),
        tf.keras.layers.Conv2D(128, 3, activation='relu', padding='same'),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.MaxPool2D(),
        tf.keras.layers.Conv2D(256, 3, activation='relu', padding='same'),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.MaxPool2D(),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dropout(0.3),
        tf.keras.layers.Dense(2, activation='sigmoid')
    ])
    
    return model


In [None]:
model = build_model_2()
model.summary()

In [None]:
f = h5py.File('../input/vgg16/vgg16_weights_tf_dim_ordering_tf_kernels_notop.h5', 'r')


w,b = f['block1_conv1']['block1_conv1_W_1:0'], f['block1_conv1']['block1_conv1_b_1:0']
model.layers[1].set_weights = [w,b]
w,b = f['block1_conv2']['block1_conv2_W_1:0'], f['block1_conv2']['block1_conv2_b_1:0']
model.layers[2].set_weights = [w,b]
w,b = f['block2_conv1']['block2_conv1_W_1:0'], f['block2_conv1']['block2_conv1_b_1:0']
model.layers[4].set_weights = [w,b]
w,b = f['block2_conv2']['block2_conv2_W_1:0'], f['block2_conv2']['block2_conv2_b_1:0']
model.layers[5].set_weights = [w,b]

f.close()
model.summary()   

In [None]:
opt = Adam(learning_rate=0.0001, decay=1e-5)
es = EarlyStopping(patience=5)
metrics = ['accuracy',
           tf.keras.metrics.Precision(name='precision'),
           tf.keras.metrics.Recall(name='recall')]

checkpoint = ModelCheckpoint(filepath='best_model_todate4',
                             save_best_only=True, save_weights_only=True)
model.compile(loss='binary_crossentropy', metrics=metrics, optimizer=opt)

In [None]:
batch_size = 16
n_epochs = 20

train_data_gen = data_gen(data=train_data, batch_size=batch_size)
n_train_steps = train_data.shape[0]//batch_size

print('Number of training and validation steps: {} and {}'.format(n_train_steps, len(val_data)))

In [None]:
history = model.fit(train_data_gen,
                    epochs=n_epochs,
                    steps_per_epoch=n_train_steps,
                    validation_data=(val_data, val_labels),
                    callbacks=[es, checkpoint])

In [None]:
model.load_weights("best_model_todate4")

In [None]:
normal_cases_path = test_path / 'NORMAL'
pneumonia_cases_path = test_path / 'PNEUMONIA'

normal_cases = normal_cases_path.glob('*.jpeg')
pneumonia_cases = pneumonia_cases_path.glob('*.jpeg')

test_data = []
test_labels = []

for img in normal_cases:
    img = cv2.imread(str(img))
    img = convert_grayscale(img)
    label = to_categorical(0, num_classes=2)
    test_data.append(img)
    test_labels.append(label)
    
for img in pneumonia_cases:
    img = cv2.imread(str(img))
    img = convert_grayscale(img)
    label = to_categorical(1, num_classes=2)
    test_data.append(img)
    test_labels.append(label)
    
test_data = np.array(test_data)
test_labels = np.array(test_labels)

print('Total number of test examples:', test_data.shape)
print('total number of labels:', test_labels.shape)

In [None]:
results = model.evaluate(test_data,test_labels, batch_size=16)
print('Res:', results)
# print('Accuracy on test set:', test_score)

In [None]:
preds = model.predict(test_data, batch_size=16)
preds = np.argmax(preds, axis=-1)

org_test_labels = np.argmax(test_labels, axis=-1)

print(org_test_labels.shape)
print(preds.shape)

In [None]:
cm = confusion_matrix(org_test_labels, preds)
plt.figure()
plot_confusion_matrix(cm, figsize=(12,8),
                      hide_ticks=True, cmap=plt.cm.Blues)
plt.xticks(range(2), ['Normal', 'Pneumonia'], fontsize=16)
plt.yticks(range(2), ['Normal', 'Pneumonia'], fontsize=16)
plt.show()

In [None]:
tn, fp, fn, tp = cm.ravel()

precision = tp/(tp+fp)
recall = tp/(tp+fn)
print(tn, fp, fn, tp)
print("Recall of the model is {:.2f}".format(recall))
print("Precision of the model is {:.2f}".format(precision))