In [None]:
import numpy as np
import pandas as pd
import random as rn
from tqdm.auto import tqdm

from keras.preprocessing import image
from skimage.io import imread
from keras.utils import np_utils
import os
from google.colab import drive
from pathlib import Path

import tensorflow as tf
from tensorflow.keras.callbacks import EarlyStopping,ModelCheckpoint,ReduceLROnPlateau
from keras.preprocessing.image import ImageDataGenerator
from keras.optimizers import Adam
from tensorflow.keras.utils import plot_model, to_categorical

import seaborn as sns
import matplotlib.pyplot as plt

import cv2

In [None]:
seed=32152339
np.random.seed(seed)
rn.seed(seed)
tf.random.set_seed(seed)

In [None]:
print('tensorflow version :',tf.__version__)
print('keras versoin :',tf.keras.__version__)

# data load

In [None]:
drive.mount('/gdrive', force_remount=True)

In [None]:
# Define path to the data directory
data_dir = Path('/gdrive/MyDrive/Colab Notebooks/kaggle_pneumonia_classification')

# Path to train directory (Fancy pathlib...no more os.path!!)
train_dir = data_dir / 'train'

# Path to validation directory
val_dir = data_dir / 'val'

# Path to test directory
test_dir = data_dir / 'test'

In [None]:
# train path 적재
normal_cases_dir = train_dir / 'NORMAL'
pneumonia_cases_dir = train_dir / 'PNEUMONIA'
normal_cases = normal_cases_dir.glob('*.jpeg')
pneumonia_cases = pneumonia_cases_dir.glob('*.jpeg')

# 적재할 리스트 생성
train_data = []

# 정상 이미지와 라벨 적재
for img in normal_cases:
    train_data.append((img,0))

# 폐렴 이미지와 라벨 적재
for img in pneumonia_cases:
    train_data.append((img, 1))

# 데이터 프레임으로 변환
train_data = pd.DataFrame(train_data, columns=['image', 'label'],index=None)

# 데이터 섞기 
train_data = train_data.sample(frac=1.).reset_index(drop=True)

# 데이터 프레임 살펴보기
train_data.head()

In [None]:
img_size=224
X_train = np.zeros(shape=(len(train_data),img_size,img_size,3))

for idx,fname in enumerate(tqdm(train_data.image)):
  img = image.load_img(fname,target_size=(img_size,img_size))
  img_array_train = image.img_to_array(img)
  img_array_train = np.expand_dims(img_array_train,axis=0)
  X_train[idx] = img_array_train

In [None]:
# valid path 적재
normal_cases_dir = val_dir / 'NORMAL'
pneumonia_cases_dir = val_dir / 'PNEUMONIA'
normal_cases = normal_cases_dir.glob('*.jpeg')
pneumonia_cases = pneumonia_cases_dir.glob('*.jpeg')

# 적재할 리스트 생성
val_data = []

# 정상 이미지와 라벨 적재
for img in normal_cases:
    val_data.append((img,0))

# 폐렴 이미지와 라벨 적재
for img in pneumonia_cases:
    val_data.append((img, 1))

# 데이터 프레임으로 변환
val_data = pd.DataFrame(val_data, columns=['image', 'label'],index=None)

# 데이터 섞기 
val_data = val_data.sample(frac=1.).reset_index(drop=True)

# 데이터 프레임 살펴보기
val_data.head()

In [None]:
img_size=224
X_val = np.zeros(shape=(len(val_data),img_size,img_size,3))

for idx,fname in enumerate(tqdm(val_data.image)):
  img = image.load_img(fname,target_size=(img_size,img_size))
  img_array_val = image.img_to_array(img)
  img_array_val = np.expand_dims(img_array_val,axis=0)
  X_val[idx] = img_array_val

In [None]:
# test path 적재
normal_cases_dir = test_dir / 'NORMAL'
pneumonia_cases_dir = test_dir / 'PNEUMONIA'
normal_cases = normal_cases_dir.glob('*.jpeg')
pneumonia_cases = pneumonia_cases_dir.glob('*.jpeg')

# 적재할 리스트 생성
test_data = []

# 정상 이미지와 라벨 적재
for img in normal_cases:
    test_data.append((img,0))

# 폐렴 이미지와 라벨 적재
for img in pneumonia_cases:
    test_data.append((img, 1))

# 데이터 프레임으로 변환
test_data = pd.DataFrame(test_data, columns=['image', 'label'],index=None)

# 데이터 섞기 
test_data = test_data.sample(frac=1.).reset_index(drop=True)

# 데이터 프레임 살펴보기
test_data.head()

In [None]:
img_size=224
X_test = np.zeros(shape=(len(test_data),img_size,img_size,3))

for idx,fname in enumerate(tqdm(test_data.image)):
  img = image.load_img(fname,target_size=(img_size,img_size))
  img_array_test = image.img_to_array(img)
  img_array_test = np.expand_dims(img_array_test,axis=0)
  X_test[idx] = img_array_test

# EDA

In [None]:
print('train set shape :',X_train.shape,train_data['label'].shape)
print('valid set shape :',X_val.shape,val_data['label'].shape)
print('test set shape :',X_test.shape,test_data['label'].shape)

In [None]:
plt.bar(["Pneumonia : 1","Normal : 0"],train_data['label'].value_counts())

- 폐렴이미지의 수가 3배정도 높은 빈도를 관찰할 수 있음

#### train image 관찰하기

In [None]:
#train image 관찰하기
pneumonia_samples = (train_data[train_data['label']==1]['image'].iloc[:5]).tolist()
normal_samples = (train_data[train_data['label']==0]['image'].iloc[:5]).tolist()
# 폐렴과 정상 이미지 통합
samples = pneumonia_samples + normal_samples
del pneumonia_samples, normal_samples
# 시각화
f, ax = plt.subplots(2,5, figsize=(30,10))
for i in range(10):
    img = imread(samples[i])
    ax[i//5, i%5].imshow(img, cmap='gray')
    if i<5:
        ax[i//5, i%5].set_title("Pneumonia")
    else:
        ax[i//5, i%5].set_title("Normal")
    ax[i//5, i%5].axis('off')
    ax[i//5, i%5].set_aspect('auto')
plt.show()

- 육안상으로 폐렴 유무를 구분하기 힘듬

#### valid image 관찰하기

In [None]:
#valid image 관찰하기
pneumonia_samples = (val_data[train_data['label']==1]['image'].iloc[:5]).tolist()
normal_samples = (val_data[train_data['label']==0]['image'].iloc[:5]).tolist()

# 폐렴과 정상 이미지 통합
samples = pneumonia_samples + normal_samples
del pneumonia_samples, normal_samples

# 시각화
f, ax = plt.subplots(2,5, figsize=(30,10))
for i in range(10):
    img = imread(samples[i])
    ax[i//5, i%5].imshow(img, cmap='gray')
    if i<5:
        ax[i//5, i%5].set_title("Pneumonia")
    else:
        ax[i//5, i%5].set_title("Normal")
    ax[i//5, i%5].axis('off')
    ax[i//5, i%5].set_aspect('auto')
plt.show()

이미지의 특징
- 가운데로 모두 정렬되어 있기 때문에 augmentaion 큰 변화 지양
- 이미지의 좌우상하가 명확함 
- 이미지의 각도가 일정함
- 흑백이며 x-ray이기 때문에 밝기도 어느정도 일정함.

=> 이러한 정보를 바탕으로 augmentaion 시도

# Augmentation

In [None]:
datagen = ImageDataGenerator(
        rescale = 1./255,
        featurewise_center=False, 
        samplewise_center=False,  
        featurewise_std_normalization=False,  
        samplewise_std_normalization=False,  
        zca_whitening=False, 
        rotation_range = 30, 
        zoom_range = 0.2, 
        width_shift_range=0.1,  
        height_shift_range=0.1,  
        horizontal_flip = False,  
        vertical_flip=False) 
test_datagen = ImageDataGenerator(rescale=1./255)

- 30도씩 랜덤하게 회전
- 무작위로 20% 확대/축소
- 10%만큼 수평으로 이동
- 10%만큼 수직으로 이동


In [None]:
datagen.fit(X_train)

# Modeling

In [None]:
# CNN 빌드
def conv_block(filters):
    block = tf.keras.Sequential([
        tf.keras.layers.SeparableConv2D(filters, 3, activation='relu', padding='same'),
        tf.keras.layers.SeparableConv2D(filters, 3, activation='relu', padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.MaxPool2D()
    ]
    )
    return block
def dense_block(units, dropout_rate):
    block = tf.keras.Sequential([
        tf.keras.layers.Dense(units, activation='relu'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Dropout(dropout_rate)
    ])
    return block
def build_model():
    model = tf.keras.Sequential([
        tf.keras.Input(shape=(img_size, img_size, 3)),
        tf.keras.layers.Conv2D(16, 3, activation='relu', padding='same'),
        tf.keras.layers.Conv2D(16, 3, activation='relu', padding='same'),
        tf.keras.layers.MaxPool2D(),       
        conv_block(32),
        conv_block(64),        
        conv_block(128),
        tf.keras.layers.Dropout(0.2),       
        conv_block(256),
        tf.keras.layers.Dropout(0.2),       
        tf.keras.layers.Flatten(),
        dense_block(512, 0.7),
        dense_block(128, 0.5),
        dense_block(64, 0.3),        
        tf.keras.layers.Dense(2, activation='softmax')  
    ])
    model.compile( loss='binary_crossentropy',optimizer='adam',metrics='accuracy')
    return model

In [None]:
# 모델 밸런싱

train_data['label'].value_counts(normalize=True)[0]

initial_bias = np.log([train_data['label'].value_counts()[1]/train_data['label'].value_counts()[0]])
initial_bias

weight_for_0 = (1 / train_data['label'].value_counts()[0])*(len(train_data))/2.0 
weight_for_1 = (1 / train_data['label'].value_counts()[1])*(len(train_data))/2.0

class_weight = {0: weight_for_0, 1: weight_for_1}

print('Weight for class 0: {:.2f}'.format(weight_for_0))
print('Weight for class 1: {:.2f}'.format(weight_for_1))

In [None]:
y_train=train_data["label"].values
y_val=val_data["label"].values

In [None]:
model_dir = data_dir / 'model'
checkpoint_path = model_dir / "cp.ckpt"
checkpoint_dir = os.path.dirname(checkpoint_path)
batch_size=32

es = EarlyStopping(monitor='val_loss', min_delta=0.001, patience=10,
                   verbose=1, mode='min', baseline=None, restore_best_weights=True)

rlr = ReduceLROnPlateau(monitor='val_loss', patience = 8, verbose=1,factor=0.5, min_lr=0.000001)

chkpt = ModelCheckpoint(filepath=checkpoint_path, save_best_only=True, save_weights_only=True)

clf = build_model()
hist=clf.fit(datagen.flow(X_train,to_categorical(y_train),batch_size=batch_size),
            validation_data=datagen.flow(X_val, to_categorical(y_val),batch_size=batch_size),
            epochs=50,
            class_weight=class_weight,
            callbacks=[es, chkpt 
                       ,rlr
                       ]
            )

In [None]:
train_acc = hist.history['accuracy']
val_acc = hist.history['val_accuracy']

In [None]:
plt.plot(range(1,len(train_acc)+1),train_acc)
plt.plot(range(1,len(train_acc)+1),val_acc)
plt.legend(['train accuracy', 'validation accuracy'],loc=2)
plt.xlabel('epoch')
plt.ylabel('accuracy')
plt.show()

In [None]:
clf.load_weights(checkpoint_path)

In [None]:
clf.summary()

In [None]:
loss, acc = clf.evaluate(test_datagen.flow(X_test,  to_categorical(test_data['label'].values)), verbose=2)
print('test loss: {:0.4f}'.format(loss))
print('test accuracy: {:5.2f}%'.format(100*acc))

# cross validation 시도

In [None]:
import tensorflow as tf
from tensorflow.keras import Sequential
from tensorflow.keras.layers import ZeroPadding2D,Conv2D,MaxPooling2D,Flatten,Dropout,Dense

In [None]:
X_train=np.concatenate((X_train,X_val))
y=np.concatenate((train_data["label"].values,val_data['label'].values))
y_tst=test_data["label"].values
print(X_train.shape,y.shape)

In [None]:
def build_model2():
  model = Sequential()
  model.add(ZeroPadding2D((1,1),input_shape=X_train.shape[1:]))
  model.add(Conv2D(64, (3, 3), activation='relu'))
  model.add(ZeroPadding2D((1,1)))
  model.add(Conv2D(64, (3, 3), activation='relu'))
  model.add(MaxPooling2D((2,2), strides=(2,2)))

  model.add(ZeroPadding2D((1,1)))
  model.add(Conv2D(128, (3, 3), activation='relu'))
  model.add(ZeroPadding2D((1,1)))
  model.add(Conv2D(128, (3, 3), activation='relu'))
  model.add(MaxPooling2D((2,2), strides=(2,2)))

  model.add(ZeroPadding2D((1,1)))
  model.add(Conv2D(256, (3, 3), activation='relu'))
  model.add(ZeroPadding2D((1,1)))
  model.add(Conv2D(256, (3, 3), activation='relu'))
  model.add(ZeroPadding2D((1,1)))
  model.add(Conv2D(256, (3, 3), activation='relu'))
  model.add(MaxPooling2D((2,2), strides=(2,2)))

  model.add(ZeroPadding2D((1,1)))
  model.add(Conv2D(512, (3, 3), activation='relu'))
  model.add(ZeroPadding2D((1,1)))
  model.add(Conv2D(512, (3, 3), activation='relu'))
  model.add(ZeroPadding2D((1,1)))
  model.add(Conv2D(512, (3, 3), activation='relu'))
  model.add(MaxPooling2D((2,2), strides=(2,2)))

  model.add(ZeroPadding2D((1,1)))
  model.add(Conv2D(512, (3, 3), activation='relu'))
  model.add(ZeroPadding2D((1,1)))
  model.add(Conv2D(512, (3, 3), activation='relu'))
  model.add(ZeroPadding2D((1,1)))
  model.add(Conv2D(512, (3, 3), activation='relu'))
  model.add(MaxPooling2D((2,2), strides=(2,2)))

  model.add(Flatten())
  model.add(Dense(4096, activation='relu'))
  model.add(Dropout(0.5))
  model.add(Dense(4096, activation='relu'))
  model.add(Dropout(0.5))
  model.add(Dense(2, activation='softmax'))


  optimizer = Adam(lr = 0.0001)
  model.compile(loss="categorical_crossentropy", metrics=["accuracy"], optimizer=optimizer)

  return model

- 더욱 깊은 모델 시도

In [None]:
n_class=2
n_fold=3
batch_size=8
cv = StratifiedKFold(n_splits=n_fold, shuffle=True, random_state=seed)
tqdm_callback = tfa.callbacks.TQDMProgressBar() #jupyter notebook에서 사용하면 유용한 기능

In [None]:
p_val = np.zeros((X_train.shape[0], n_class))
p_tst = np.zeros((X_test.shape[0], n_class))
for i, (i_trn, i_val) in enumerate(cv.split(X_train, y), 1):
    print(f'training model for CV #{i}')
    clf = build_model2()
    
    es = EarlyStopping(monitor='val_loss', min_delta=0.001, patience=5,
                       verbose=1, mode='min', baseline=None, restore_best_weights=True)
    
    rlr = ReduceLROnPlateau(monitor='val_loss', factor=0.5,
                        patience=3, min_lr=1e-6, mode='min', verbose=1)

    hist=clf.fit(datagen.flow(X_train[i_trn],to_categorical(y[i_trn]),batch_size=batch_size),
            validation_data=test_datagen.flow(X_train[i_val], to_categorical(y[i_val])),
            epochs=50, 
            # batch_size=batch_size,
            class_weight=class_weight,
            callbacks=[es,
                       rlr,
                       # tqdm_callback #jupyter notebook에서 사용하면 유용한 기능. 상태바 출력.
                       ]
            )
    p_val[i_val, :] = clf.predict(test_datagen.flow(X_train[i_val]))
    p_tst += clf.predict(test_datagen.flow(X_test)) / n_fold

In [None]:
accuracy_score(np.argmax(p_tst,axis=1),y_tst) 

- momory 이슈 발생
- 트레이닝 과정에서 확실히 더 안정적이고 좋은 성능을 보이지만 결과를 내지 못해 아쉬움.