In [None]:
# 캐글 대회 데이터 다운로드 
!kaggle competitions download -c kdtai-1

In [None]:
# 압축파일 input 폴더 내 해제
import zipfile
zipfile.ZipFile(f'kdtai-1.zip').extractall('/content')

In [None]:
import numpy as np
import pandas as pd
from PIL import Image
import os
import albumentations as A
import cv2
import tensorflow as tf #tensorflow 2.9버전 사용 권장
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import KFold

In [None]:
def makeArray(file_path, required_size=(224,224)):
    img = Image.open(file_path)
    img = img.convert("RGB")
    img_resized = img.resize(required_size)
    img_arr = np.asarray(img_resized)
    return img_arr

In [None]:
def getTrainData():
    label_df = pd.read_csv("COVID_19_XRAY/train/labels.csv")

    tr_x, tr_y = [], []
    for i in range(len(label_df)):
        file_name, label = label_df.iloc[i]
        img_arr = makeArray("COVID_19_XRAY/train/{}".format(file_name))
        tr_x.append(img_arr)
        tr_y.append(label)
    print("tr_x : {}, tr_y : {}".format(tr_x, tr_y))
    return tr_x, tr_y

In [None]:
def getTestData():
    test_x = []
    for paths, dir, files in os.walk("COVID_19_XRAY/test"):
        for file in files:
            file_path = os.path.join(paths, file)
            print(file_path)
            img_arr = makeArray(file_path)
            test_x.append(img_arr)
    test_x_arr = np.asarray(test_x)
    return test_x_arr

In [None]:
def getAugRotate(tr_x):
    print("getting rotate aug")
    tr_aug_x_list=[]
    for i,img in enumerate(tr_x):
        transform=A.Compose([
            A.augmentations.geometric.rotate.Rotate(limit=10, border_mode=cv2.BORDER_CONSTANT, always_apply=True)
        ])
        aug_img=transform(image=img)['image']
        tr_aug_x_list.append(aug_img)
    tr_aug_x=np.asarray(tr_aug_x_list)
    return tr_aug_x

In [None]:
def getAugFlip(tr_x):
    print("getting flip aug")
    tr_aug_x_list=[]
    for i,img in enumerate(tr_x):
        transform=A.Compose([
            A.augmentations.geometric.transforms.HorizontalFlip(always_apply=True)
        ])
        aug_img=transform(image=img)['image']
        tr_aug_x_list.append(aug_img)
    tr_aug_x=np.asarray(tr_aug_x_list)
    return tr_aug_x

In [None]:
def getAugGaussNoise(tr_x):
    print("getting noise aug")
    tr_aug_x_list=[]
    for i,img in enumerate(tr_x):
        transform=A.Compose([
            A.augmentations.transforms.GaussNoise(var_limit=(99.0, 100.0), always_apply=True)
        ])
        aug_img=transform(image=img)['image']
        tr_aug_x_list.append(aug_img)
    tr_aug_x=np.asarray(tr_aug_x_list)
    return tr_aug_x

In [None]:
def getAugTotal(tr_x):
    tr_x_rotate = getAugRotate(tr_x)
    tr_x_flip = getAugFlip(tr_x)
    tr_x_noise = getAugGaussNoise(tr_x)

    tr_x_arr = np.asarray(tr_x)

    print(tr_x_arr.shape)
    print(tr_x_rotate.shape)
    print(tr_x_flip.shape)
    print(tr_x_noise.shape)

    tr_x_total = np.concatenate((tr_x_arr, tr_x_rotate))
    tr_x_total = np.concatenate((tr_x_total, tr_x_flip))
    tr_x_total = np.concatenate((tr_x_total, tr_x_noise))

    print("tr_x_total shape : {}".format(tr_x_total.shape))
    return tr_x_total

In [None]:
tr_x, tr_y = getTrainData()
test_x_arr = getTestData()

print(test_x_arr)
print(test_x_arr.shape)
print(len(tr_x))
print(len(tr_y))

In [None]:
tr_x_total = getAugTotal(tr_x)

In [None]:
tr_y_arr = np.asarray(tr_y)
tr_y_encoded = np.where(tr_y_arr=='covid', 1, 0)
tr_y_total = np.concatenate((tr_y_encoded, tr_y_encoded))
tr_y_total = np.concatenate((tr_y_total, tr_y_total))

print(len(tr_y_total))

In [None]:
np.savez_compressed("xray.npz", tr_x_total, tr_y_total, test_x_arr)

In [None]:
def generate_custom_model():
    model = tf.keras.applications.resnet50.ResNet50(include_top=False, input_shape=(224,224,3), pooling='avg', weights='imagenet')
    model.trainable = False

    inputs = tf.keras.Input(shape=(224,224,3))
    x = tf.keras.applications.resnet50.preprocess_input(inputs)
    x = model(x, training=False)
    x = tf.keras.layers.Flatten()(x)
    outputs = tf.keras.layers.Dense(1, activation='sigmoid')(x)
    model_custom = tf.keras.Model(inputs, outputs)

    model_custom.summary()
    model_custom.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.01), loss=tf.keras.losses.BinaryCrossentropy(), metrics = [tf.keras.metrics.BinaryAccuracy()])

    # early = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5)
    model_checkpoint = tf.keras.callbacks.ModelCheckpoint(filepath='model_trained.h5', monitor='loss', save_best_only=True)
    return model_custom, model_checkpoint

In [None]:
def get_stacking_base_datasets(tr_x, tr_y, test_x, n_folds):
    kf = KFold(n_splits=n_folds, shuffle=True, random_state=0)
    train_fold_pred = np.zeros((tr_x.shape[0] , 1))
    test_pred = np.zeros((test_x.shape[0], n_folds))

    for i, (tr_index, val_index) in enumerate(kf.split(tr_x)):

        model_custom, model_checkpoint = generate_custom_model()
        print("K Fold number : {}".format(i+1))
        tr_x_i = tr_x[tr_index]
        tr_y_i = tr_y[tr_index]
        val_x_i = tr_x[val_index]

        model_custom.fit(tr_x_i, tr_y_i, epochs=15, batch_size=4, callbacks=[model_checkpoint], verbose=1)

        model_custom = tf.keras.models.load_model('model_trained.h5')
        train_fold_pred[val_index,:] = model_custom.predict(val_x_i).reshape(-1,1)
        test_pred[:,i] = model_custom.predict(test_x).reshape(-1,)

        tf.keras.backend.clear_session()

    test_pred_mean = np.mean(test_pred, axis=1).reshape(-1,1)
    return train_fold_pred, test_pred_mean

In [None]:
data = np.load("xray.npz")

tr_x, tr_y, test_x = data['arr_0'], data['arr_1'], data['arr_2']

print("tr_x shape : {}".format(tr_x.shape))
print("tr_y shape : {}".format(tr_y.shape))
print("test_x shape : {}".format(test_x.shape))

In [None]:
train_fold_pred, test_pred_mean = get_stacking_base_datasets(tr_x, tr_y, test_x, 5)
np.savez_compressed("resnet50_5folds.npz", train_fold_pred, test_pred_mean)

In [None]:
def generate_custom_model():
    model = tf.keras.applications.EfficientNetB1(include_top=False, input_shape=(224,224,3), pooling='avg', weights='imagenet')
    model.trainable = False

    inputs = tf.keras.Input(shape=(224,224,3))
    x = tf.keras.applications.efficientnet.preprocess_input(inputs)
    x = model(x, training=False)
    x = tf.keras.layers.Flatten()(x)
    outputs = tf.keras.layers.Dense(1, activation='sigmoid')(x)
    model_custom = tf.keras.Model(inputs, outputs)

    model_custom.summary()
    model_custom.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.01), loss=tf.keras.losses.BinaryCrossentropy(), metrics = [tf.keras.metrics.BinaryAccuracy()])

    # early = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5)
    model_checkpoint = tf.keras.callbacks.ModelCheckpoint(filepath='model_trained.h5', monitor='loss', save_best_only=True)
    return model_custom, model_checkpoint


In [None]:
def get_stacking_base_datasets(tr_x, tr_y, test_x, n_folds):
    kf = KFold(n_splits=n_folds, shuffle=True, random_state=0)
    train_fold_pred = np.zeros((tr_x.shape[0] , 1))
    test_pred = np.zeros((test_x.shape[0], n_folds))

    for i, (tr_index, val_index) in enumerate(kf.split(tr_x)):

        model_custom, model_checkpoint = generate_custom_model()
        print("K Fold number : {}".format(i+1))
        tr_x_i = tr_x[tr_index]
        tr_y_i = tr_y[tr_index]
        val_x_i = tr_x[val_index]

        model_custom.fit(tr_x_i, tr_y_i, epochs=30, batch_size=4, callbacks=[model_checkpoint], verbose=1)

        model_custom = tf.keras.models.load_model('model_trained.h5')
        train_fold_pred[val_index,:] = model_custom.predict(val_x_i).reshape(-1,1)
        test_pred[:,i] = model_custom.predict(test_x).reshape(-1,)

        tf.keras.backend.clear_session()

    test_pred_mean = np.mean(test_pred, axis=1).reshape(-1,1)
    return train_fold_pred, test_pred_mean

In [None]:
data = np.load("xray.npz")

tr_x, tr_y, test_x = data['arr_0'], data['arr_1'], data['arr_2']

print("tr_x shape : {}".format(tr_x.shape))
print("tr_y shape : {}".format(tr_y.shape))
print("test_x shape : {}".format(test_x.shape))

In [None]:
train_fold_pred, test_pred_mean = get_stacking_base_datasets(tr_x, tr_y, test_x, 5)
np.savez_compressed("efficientnetb1_5folds.npz", train_fold_pred, test_pred_mean)

In [None]:
def generate_custom_model():
    model = tf.keras.applications.EfficientNetB2(include_top=False, input_shape=(224,224,3), pooling='avg', weights='imagenet')
    model.trainable = False

    inputs = tf.keras.Input(shape=(224,224,3))
    x = tf.keras.applications.efficientnet.preprocess_input(inputs)
    x = model(x, training=False)
    x = tf.keras.layers.Flatten()(x)
    outputs = tf.keras.layers.Dense(1, activation='sigmoid')(x)
    model_custom = tf.keras.Model(inputs, outputs)

    model_custom.summary()
    model_custom.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.01), loss=tf.keras.losses.BinaryCrossentropy(), metrics = [tf.keras.metrics.BinaryAccuracy()])

    # early = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5)
    model_checkpoint = tf.keras.callbacks.ModelCheckpoint(filepath='model_trained.h5', monitor='loss', save_best_only=True)
    return model_custom, model_checkpoint

In [None]:
def get_stacking_base_datasets(tr_x, tr_y, test_x, n_folds):
    kf = KFold(n_splits=n_folds, shuffle=True, random_state=0)
    train_fold_pred = np.zeros((tr_x.shape[0] , 1))
    test_pred = np.zeros((test_x.shape[0], n_folds))

    for i, (tr_index, val_index) in enumerate(kf.split(tr_x)):

        model_custom, model_checkpoint = generate_custom_model()
        print("K Fold number : {}".format(i+1))
        tr_x_i = tr_x[tr_index]
        tr_y_i = tr_y[tr_index]
        val_x_i = tr_x[val_index]

        model_custom.fit(tr_x_i, tr_y_i, epochs=15, batch_size=4, callbacks=[model_checkpoint], verbose=1)

        model_custom = tf.keras.models.load_model('model_trained.h5')
        train_fold_pred[val_index,:] = model_custom.predict(val_x_i).reshape(-1,1)
        test_pred[:,i] = model_custom.predict(test_x).reshape(-1,)

        tf.keras.backend.clear_session()

    test_pred_mean = np.mean(test_pred, axis=1).reshape(-1,1)
    return train_fold_pred, test_pred_mean

In [None]:
data = np.load("xray.npz")

tr_x, tr_y, test_x = data['arr_0'], data['arr_1'], data['arr_2']

print("tr_x shape : {}".format(tr_x.shape))
print("tr_y shape : {}".format(tr_y.shape))
print("test_x shape : {}".format(test_x.shape))

In [None]:
train_fold_pred, test_pred_mean = get_stacking_base_datasets(tr_x, tr_y, test_x, 5)
np.savez_compressed("efficientnetb2_5folds.npz", train_fold_pred, test_pred_mean)

In [None]:
def generate_custom_model():
    model = tf.keras.applications.EfficientNetB3(include_top=False, input_shape=(224,224,3), pooling='avg', weights='imagenet')
    model.trainable = False

    inputs = tf.keras.Input(shape=(224,224,3))
    x = tf.keras.applications.efficientnet.preprocess_input(inputs)
    x = model(x, training=False)
    x = tf.keras.layers.Flatten()(x)
    outputs = tf.keras.layers.Dense(1, activation='sigmoid')(x)
    model_custom = tf.keras.Model(inputs, outputs)

    model_custom.summary()
    model_custom.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.01), loss=tf.keras.losses.BinaryCrossentropy(), metrics = [tf.keras.metrics.BinaryAccuracy()])

    # early = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5)
    model_checkpoint = tf.keras.callbacks.ModelCheckpoint(filepath='model_trained.h5', monitor='loss', save_best_only=True)
    return model_custom, model_checkpoint

In [None]:
def get_stacking_base_datasets(tr_x, tr_y, test_x, n_folds):
    kf = KFold(n_splits=n_folds, shuffle=True, random_state=0)
    train_fold_pred = np.zeros((tr_x.shape[0] , 1))
    test_pred = np.zeros((test_x.shape[0], n_folds))

    for i, (tr_index, val_index) in enumerate(kf.split(tr_x)):

        model_custom, model_checkpoint = generate_custom_model()
        print("K Fold number : {}".format(i+1))
        tr_x_i = tr_x[tr_index]
        tr_y_i = tr_y[tr_index]
        val_x_i = tr_x[val_index]

        model_custom.fit(tr_x_i, tr_y_i, epochs=30, batch_size=4, callbacks=[model_checkpoint], verbose=1)

        model_custom = tf.keras.models.load_model('model_trained.h5')
        train_fold_pred[val_index,:] = model_custom.predict(val_x_i).reshape(-1,1)
        test_pred[:,i] = model_custom.predict(test_x).reshape(-1,)

        tf.keras.backend.clear_session()

    test_pred_mean = np.mean(test_pred, axis=1).reshape(-1,1)
    return train_fold_pred, test_pred_mean

In [None]:
data = np.load("xray.npz")

tr_x, tr_y, test_x = data['arr_0'], data['arr_1'], data['arr_2']

print("tr_x shape : {}".format(tr_x.shape))
print("tr_y shape : {}".format(tr_y.shape))
print("test_x shape : {}".format(test_x.shape))

In [None]:
train_fold_pred, test_pred_mean = get_stacking_base_datasets(tr_x, tr_y, test_x, 5)
np.savez_compressed("efficientnetb3_5folds.npz", train_fold_pred, test_pred_mean)

In [None]:
data = np.load("xray.npz")
tr_y = data['arr_1']

In [None]:
data = np.load("resnet50_10folds.npz")
tr_resnet50, test_resnet50 = data['arr_0'], data['arr_1']

In [None]:
data = np.load("efficientnetb1_5folds.npz")
tr_efficientnetb1, test_efficientnetb1 = data['arr_0'], data['arr_1']

In [None]:
data = np.load("efficientnetb2_5folds.npz")
tr_efficientnetb2, test_efficientnetb2 = data['arr_0'], data['arr_1']

In [None]:
data = np.load("efficientnetb3_5folds.npz")
tr_efficientnetb3, test_efficientnetb3 = data['arr_0'], data['arr_1']

In [None]:
tr_x = np.concatenate((tr_resnet50, tr_efficientnetb1, tr_efficientnetb2, tr_efficientnetb3), axis=1)
test_x = np.concatenate((test_resnet50, test_efficientnetb1, test_efficientnetb2, test_efficientnetb3), axis=1)

In [None]:
model_lr = LogisticRegression()
model_lr.fit(tr_x, tr_y)
final_preds = model_lr.predict(test_x)

In [None]:
final_preds_str = np.where(final_preds==1., 'covid', 'normal')

In [None]:
sbmit_df = pd.read_csv("COVID_19_XRAY/submission.csv")

sbmit_df['label'] = final_preds_str

sbmit_df.to_csv("final.csv", index=False)