# Case 2 with Kaggle
Neural Networks for Machine Learning Applications<br>
26.2.2022, Xincheng Ni<br>
[Information Technology](https://metropolia.fi/en/academics/bachelors-degrees/information-technology)<br>
[Metropolia University of Applied Sciences](https://metropolia.fi/en)

The basic structure of this solution is taken from Tensorflow > Learn > Tensorflow Core > Tutorials > [Image Classification](https://www.tensorflow.org/tutorials/images/classification). See that for more detailed instructions.

# Introduction

Pneumonia is an infection that inflames the air sacs in one or both lungs. The air sacs may fill with fluid or pus (purulent material), causing cough with phlegm or pus, fever, chills, and difficulty breathing. A variety of organisms, including bacteria, viruses and fungi, can cause pneumonia.

In this case, a dataset of [Chest X-Ray Images](https://www.kaggle.com/paultimothymooney/chest-xray-pneumonia) are studied using  Convolutional Neural Network, to classify pneumonia X-Ray images. A simple `CNN model`, `VGG16` based model and `ResNet152V2` based model are used for practicing coding skill and find the best accuracy.

The dadaset consists of 5216 training samples and 624 tesing samples, labeled as "NORMAL" and "PNEUMONIA" in the subdirectory.

NOTE: The result can still be improved, need more experiment and tuning, see 'Conclusion' part.

# Setup


In [None]:
%pylab inline
import os
import numpy as np 
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, models, optimizers
from tensorflow.keras.models import Sequential
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.layers.experimental.preprocessing import Rescaling
from sklearn.utils import class_weight
from sklearn.metrics import confusion_matrix, classification_report
print("tensorflow", tf.__version__)

# Dataset

In [None]:
# Import dataset
train_dir = '../input/chest-xray-pneumonia/chest_xray/train'
test_dir = '../input/chest-xray-pneumonia/chest_xray/test'
val_dir = '../input/chest-xray-pneumonia/chest_xray/val'

# Count normal and pneumonia samples for each dataset
num_train_normal = len(os.listdir(os.path.join(train_dir, 'NORMAL')))
num_train_pneumonia = len(os.listdir(os.path.join(train_dir, 'PNEUMONIA')))
print("Train set:")
print(f"Normal:    {num_train_normal}")
print(f"Pneumonia: {num_train_pneumonia}")

num_test_normal = len(os.listdir(os.path.join(test_dir, 'NORMAL')))
num_test_pneumonia = len(os.listdir(os.path.join(test_dir, 'PNEUMONIA')))
print("\nTest set:")
print(f"Normal:    {num_test_normal}")
print(f"Pneumonia: {num_test_pneumonia}")

### Visualize X-Ray images

In [None]:
normal_dir = os.path.join(train_dir, 'NORMAL')
normal = os.listdir(normal_dir)

plt.figure(figsize=(10, 10))
for i in range(3):
    plt.subplot(1, 3, i + 1)
    img = plt.imread(os.path.join(normal_dir, normal[i]))
    plt.imshow(img)
    plt.title('Normal')

In [None]:
pneumonia_dir = os.path.join(train_dir, 'PNEUMONIA')
pneumonia = os.listdir(pneumonia_dir)

plt.figure(figsize=(10, 10))
for i in range(3):
    plt.subplot(1, 3, i + 1)
    img = plt.imread(os.path.join(pneumonia_dir, pneumonia[i]))
    plt.imshow(img)
    plt.title('Pneumonia')

Note the images above has various resolution. The difference between normal and pneumonia infection images is very difficult to tell for non-specialist.

# Preprocessing

In [None]:
# This block is for raising brightness and contrast for future experiment
img_01 = '../input/chest-xray-pneumonia/chest_xray/test/NORMAL/IM-0001-0001.jpeg'
imggg = plt.imread(img_01)

imggg_3d = tf.expand_dims(imggg,2)

brighten = tf.image.adjust_brightness(imggg_3d, delta=0.2)
contrasten = tf.image.adjust_contrast(imggg_3d, 2.)
print(imggg.shape, imggg_3d.shape)

plt.imshow(imggg)
plt.title("Original img")
plt.show()

plt.imshow(brighten)
plt.title("Brightened img")
plt.show()

plt.imshow(contrasten)
plt.title("Contranst enhanced img")
plt.show()

Constant settings.

In [None]:
# Training settings
BATCH_SIZE = 32
IMG_HEIGHT, IMG_WIDTH = 240, 240
IMG_SIZE = (IMG_HEIGHT, IMG_WIDTH)

N = 10           # epoch
STEPS_PER_EPOCH = 4173 / BATCH_SIZE
VALIDATION_STEPS = 1043 / BATCH_SIZE
METRICS = ['accuracy']

Use `ImageDataGenerator` for preprocessing and augmenting data.

rescale:             Rescale pixel value into 0-1.<br>
rotation_range:      Rotate image for random degree range.<br>
width_shift_range:   Shift image horizontally.<br>
height_shift_range:  Shift image vertically.<br>
shear_range:         Shear angle in counter-clockwise direction in degrees.<br>
zoom_range:          Zoom for random range.

In [None]:
def custom_augmentation(np_tensor):
 
    def random_contrast(np_tensor):
        return tf.image.random_contrast(np_tensor, 0.5, 2)

    augmnted_tensor = random_contrast(np_tensor)
    return np.array(augmnted_tensor)

train_datagen = ImageDataGenerator(rescale = 1./255,
                                   rotation_range = 20,
                                   width_shift_range = 0.2,
                                   height_shift_range = 0.2,
                                   shear_range = 0.2,
                                   zoom_range = 0.2,
                                   brightness_range = (1, 1.2),
                                   preprocessing_function=custom_augmentation,
                                   validation_split = 0.2)

test_datagen = ImageDataGenerator(rescale = 1./255)

Iterate through dataset directories and prepare data.

In [None]:
train_ds = train_datagen.flow_from_directory(train_dir,
                                             subset = "training",
                                             class_mode = 'binary',
#                                              shuffle = True,
                                             seed = 123,
                                             target_size = IMG_SIZE,
                                             batch_size = BATCH_SIZE)

val_ds = train_datagen.flow_from_directory(train_dir,
                                           subset = "validation",
                                           class_mode = 'binary',
                                           shuffle = False,
                                           seed = 123,
                                           target_size = IMG_SIZE,
                                           batch_size = BATCH_SIZE)

test_ds = test_datagen.flow_from_directory(test_dir,
                                           class_mode = 'binary',
                                           shuffle = False,
                                           target_size = IMG_SIZE,
                                           batch_size = BATCH_SIZE)

### Speed up

In [None]:
# # Speeding up the data processing
# AUTOTUNE = tf.data.experimental.AUTOTUNE
# train_datagen = train_ds.cache().shuffle(1000).prefetch(buffer_size=AUTOTUNE)
# val_ds = val_ds.cache().prefetch(buffer_size=AUTOTUNE)

# Model 1 - Simple CNN model

### Modeling

Building a simple CNN model. Apply dropout after flatten layer.

In [None]:
cls_wt = class_weight.compute_class_weight('balanced',
                                           np.unique(train_ds.classes),
                                           train_ds.classes)
class_weights = {0: cls_wt[0], 1:cls_wt[1]}
print(class_weights)


In [None]:
model_1 = Sequential([
#     layers.experimental.preprocessing.RandomContrast(factor=0.2, input_shape=(IMG_HEIGHT, IMG_WIDTH, 3)),

    layers.Conv2D(64, 3, padding='same', activation='relu', input_shape=(IMG_HEIGHT, IMG_WIDTH, 3)),
    layers.BatchNormalization(),
#     layers.Conv2D(32, 3, padding='same', activation='relu'),
#     layers.BatchNormalization(),
    layers.MaxPooling2D(),
    
#     layers.Conv2D(64, 3, padding='same', activation='relu'),
#     layers.BatchNormalization(),
    layers.Conv2D(128, 3, padding='same', activation='relu'),
    layers.BatchNormalization(),
    layers.MaxPooling2D(),
    
#     layers.Conv2D(128, 3, padding='same', activation='relu'),
#     layers.BatchNormalization(),
    layers.Conv2D(256, 3, padding='same', activation='relu'),
    layers.BatchNormalization(),
    layers.MaxPooling2D(),
    
    layers.Flatten(),
    layers.Dense(512, activation='relu'),
    layers.Dropout(0.2),
    layers.Dense(128, activation='relu'),
    layers.Dropout(0.2),

    layers.Dense(1, activation = 'sigmoid')
])

model_1.compile(optimizer = optimizers.RMSprop(learning_rate = 1e-4),
                loss = 'binary_crossentropy',
                metrics = METRICS)

In [None]:
model_1.summary()

Create checkpoint callback to save best result.

In [None]:
checkpoint_filepath = "model_1_{epoch:02d}-{val_accuracy:.2f}.hdf5"

checkpoint_model_1 = ModelCheckpoint(filepath=os.path.join("./case2/checkpoint", checkpoint_filepath), 
                                     monitor='val_accuracy',
                                     verbose=0, 
                                     save_best_only=True)
callbacks_1 = [checkpoint_model_1]

### Training

In [None]:
%%time

h1 = model_1.fit(train_ds,
                validation_data = val_ds,
                batch_size = BATCH_SIZE,
                steps_per_epoch = STEPS_PER_EPOCH,
                validation_steps = VALIDATION_STEPS,
                 class_weight = class_weights,
                verbose = 1, # FOR FINAL VERSION, CHANGE TO 0!
                epochs = N,
                callbacks = checkpoint_model_1)

model_filepath = os.path.join("./case2/model", "model_1.h5")
model_1.save(model_filepath)

In [None]:
print(f"Mean Loss:         {np.array(h1.history['loss']).mean()}")
print(f"Mean Accuracy:     {np.array(h1.history['accuracy']).mean()}")
print(f"Mean Val-Loss:     {np.array(h1.history['val_loss']).mean()}")
print(f"Mean Val-Accuracy: {np.array(h1.history['val_accuracy']).mean()}")

Plot graphs for training history.

In [None]:
x = range(1, N+1)

figure(figsize(13, 5))
subplot(1, 2, 1)
plot(x, h1.history['loss'], '*-', label = 'training')
plot(x, h1.history['val_loss'], 'o-', label = 'validation')
title('loss')
ylim(0, )
legend()
grid()

subplot(1, 2, 2)
plot(x, h1.history['accuracy'], '*-', label = 'training')
plot(x, h1.history['val_accuracy'], 'o-', label = 'validation')
title('accuracy')
ylim(0.5, 1.0)
legend()
grid()

### Result

In [None]:
model_1.evaluate(test_ds)

In [None]:
m1_pred = (model_1.predict(test_ds) > 0.5).astype("int32")

In [None]:
cm = confusion_matrix(test_ds.classes, m1_pred)

sen = cm[0][0] / ((cm[0][0]) + (cm[0][1]))
spe = cm[1][1] / ((cm[1][0]) + (cm[1][1]))

print(cm)
print(f"Sensitivity: {sen}")
print(f"Specificity: {spe}")
print(classification_report(test_ds.classes, m1_pred, target_names=['Normal - 0', 'Pneumonia - 1']))

# Model 2 - VGG16

### Modeling

Building a VGG16 based model.

In [None]:
from tensorflow.keras.applications.vgg16 import VGG16

vgg16_base = VGG16(weights='imagenet', include_top=False,
                  input_shape=(IMG_HEIGHT, IMG_WIDTH,3), pooling='avg')
vgg16_base.trainable = False

# vgg16_base.summary()

In [None]:
vgg16_model = tf.keras.Sequential([vgg16_base,
                                   layers.Dense(128, activation="relu"),
                                   layers.Dropout(0.2),
                                   layers.BatchNormalization(),
                                   layers.Dense(64,activation="relu"),
                                   layers.Dropout(0.2),
                                   layers.BatchNormalization(),
                                   layers.Dense(1,activation="sigmoid")])

vgg16_model.compile(optimizer=optimizers.Adam(learning_rate=0.001),
                    loss='binary_crossentropy',
                    metrics=METRICS)

vgg16_model.summary()

In [None]:
checkpoint_filepath = "vgg16_model_{epoch:02d}-{val_accuracy:.2f}.hdf5"

checkpoint_model_vgg16 = ModelCheckpoint(filepath=os.path.join("./case2/checkpoint", checkpoint_filepath), 
                                     monitor='val_accuracy',
                                     verbose=0, 
                                     save_best_only=True)
callbacks_2 = [checkpoint_model_vgg16]

### Training

In [None]:
%%time

h2 = vgg16_model.fit(train_ds,
                     validation_data = val_ds,
                     batch_size = BATCH_SIZE,
                     steps_per_epoch = STEPS_PER_EPOCH,
                     validation_steps = VALIDATION_STEPS,
                     verbose = 1, # FOR FINAL VERSION, CHANGE TO 0!
                     epochs = N,
                     callbacks = callbacks_2)

model_filepath = os.path.join("./case2/model", "vgg16_model.h5")
vgg16_model.save(model_filepath)

In [None]:
print(f"Mean Loss:         {np.array(h2.history['loss']).mean()}")
print(f"Mean Accuracy:     {np.array(h2.history['accuracy']).mean()}")
print(f"Mean Val-Loss:     {np.array(h2.history['val_loss']).mean()}")
print(f"Mean Val-Accuracy: {np.array(h2.history['val_accuracy']).mean()}")

In [None]:
figure(figsize(13, 5))
subplot(1, 2, 1)
plot(x, h2.history['loss'], '*-', label = 'training')
plot(x, h2.history['val_loss'], 'o-', label = 'validation')
title('loss')
ylim(0, )
legend()
grid()

subplot(1, 2, 2)
plot(x, h2.history['accuracy'], '*-', label = 'training')
plot(x, h2.history['val_accuracy'], 'o-', label = 'validation')
title('accuracy')
ylim(0.5, 1.0)
legend()
grid()

### Result

In [None]:
vgg16_model.evaluate(test_ds)

In [None]:
m2_pred = (vgg16_model.predict(test_ds) > 0.5).astype("int32")

In [None]:
cm = confusion_matrix(test_ds.classes, m2_pred)
sen = cm[0][0] / ((cm[0][0]) + (cm[0][1]))
spe = cm[1][1] / ((cm[1][0]) + (cm[1][1]))

print(cm)
print(f"Sensitivity: {sen}")
print(f"Specificity: {spe}")
print(classification_report(test_ds.classes, m2_pred, target_names=['Normal - 0', 'Pneumonia - 1']))

# Model 3 - EfficientNetB5

### Modeling

Building a EfficientNetB5 based model.

In [None]:
from tensorflow.keras.applications.resnet_v2 import ResNet152V2

ResNet152V2_base = ResNet152V2(weights='imagenet', include_top=False,
                  input_shape=(IMG_HEIGHT, IMG_WIDTH,3), pooling='avg')
ResNet152V2_base.trainable = False

# ResNet152V2_base.summary()

In [None]:
ResNet152V2_model = tf.keras.Sequential([ResNet152V2_base,
                                         layers.Dense(128,activation="relu"),
                                         layers.Dropout(0.2),
                                         layers.BatchNormalization(),
                                         layers.Dense(1,activation="sigmoid")])

ResNet152V2_model.compile(optimizer=optimizers.Adam(learning_rate=0.001),
                    loss='binary_crossentropy',
                    metrics=METRICS)

ResNet152V2_model.summary()

In [None]:
checkpoint_filepath = "ResNet152V2_model_{epoch:02d}-{val_accuracy:.2f}.hdf5"

checkpoint_model_ResNet152V2 = ModelCheckpoint(filepath=os.path.join("./case2/checkpoint", checkpoint_filepath), 
                                     monitor='val_accuracy',
                                     verbose=0, 
                                     save_best_only=True)
callbacks_3 = [checkpoint_model_ResNet152V2]

### Training

In [None]:
%%time

h3 = ResNet152V2_model.fit(train_ds,
                        validation_data = val_ds,
                        batch_size = BATCH_SIZE,
                        steps_per_epoch = STEPS_PER_EPOCH,
                        validation_steps = VALIDATION_STEPS,
                        verbose = 1, # FOR FINAL VERSION, CHANGE TO 0!
                        epochs = N,
                        callbacks = callbacks_3)

model_filepath = os.path.join("./case2/model", "ResNet152V2_model.h5")
ResNet152V2_model.save(model_filepath)

In [None]:
print(f"Mean Loss:         {np.array(h3.history['loss']).mean()}")
print(f"Mean Accuracy:     {np.array(h3.history['accuracy']).mean()}")
print(f"Mean Val-Loss:     {np.array(h3.history['val_loss']).mean()}")
print(f"Mean Val-Accuracy: {np.array(h3.history['val_accuracy']).mean()}")

In [None]:
figure(figsize(13, 5))
subplot(1, 2, 1)
plot(x, h3.history['loss'], '*-', label = 'training')
plot(x, h3.history['val_loss'], 'o-', label = 'validation')
title('loss')
ylim(0, )
legend()
grid()

subplot(1, 2, 2)
plot(x, h3.history['accuracy'], '*-', label = 'training')
plot(x, h3.history['val_accuracy'], 'o-', label = 'validation')
title('accuracy')
ylim(0.5, 1.0)
legend()
grid()

### Result

In [None]:
ResNet152V2_model.evaluate(test_ds)

In [None]:
m3_pred = (ResNet152V2_model.predict(test_ds) > 0.5).astype("int32")

In [None]:
cm = confusion_matrix(test_ds.classes, m3_pred)
sen = cm[0][0] / ((cm[0][0]) + (cm[0][1]))
spe = cm[1][1] / ((cm[1][0]) + (cm[1][1]))

print(cm)
print(f"Sensitivity: {sen}")
print(f"Specificity: {spe}")
print(classification_report(test_ds.classes, m3_pred, target_names=['Normal - 0', 'Pneumonia - 1']))

# Conclusion

For further improvement, I would try:
1. Bigger image size e.g(300 * 300)
2. Higher contrast or brightness
3. Using grayscale
4. Reduce layers/neurons to fight overfitting

Helpful links:  
[VGG16_grayscale](https://github.com/DaveRichmond-/grayscale-imagenet/tree/master/models)  
[imagenet_grayscale](https://github.com/DaveRichmond-/grayscale-imagenet)