In [None]:
!pip install gdown
!gdown --id 1N0jmtIJkzRrPJHpXpk8mlM5JPirL6_Gb

In [None]:
!unzip '/kaggle/working/Train&Validation.zip' -d '/kaggle/working/Data'

In [None]:
import os
import shutil
from tqdm import tqdm
import random
import matplotlib.pyplot as plt
import numpy as np
from scipy import ndimage
import tensorflow as tf

In [None]:
os.remove('/kaggle/working/Train&Validation.zip')

In [None]:
# making different folders for different patients
if os.path.exists('/kaggle/working/Patients') == False:
    os.mkdir('/kaggle/working/Patients')
    
if os.path.exists('/kaggle/working/Patients/Normal') == False:
    os.mkdir('/kaggle/working/Patients/Normal')
    
if os.path.exists('/kaggle/working/Patients/COVID') == False:
    os.mkdir('/kaggle/working/Patients/COVID')

for item in tqdm(os.listdir('/kaggle/working/Data')):
    idx1 = item.find('patient')
    idx2 = item[idx1:].find('_')
    patient_num = item[idx1: idx1 + idx2]
    
    src_dir = '/kaggle/working/Data/' + item
    
    if 'normal' in item:
        des_dir = '/kaggle/working/Patients/Normal/' + patient_num
    elif 'covid' in item:
        des_dir = '/kaggle/working/Patients/COVID/' + patient_num

    if os.path.exists(des_dir):
        shutil.move(src_dir, des_dir)
    else:
        os.mkdir(des_dir)
        shutil.move(src_dir, des_dir)

In [None]:
def resize_volume(img):
    """Resize across z-axis"""
    # Set the desired depth
    desired_depth = 128
    desired_width = 128
    desired_height = 128
    # Get current depth
    current_depth = img.shape[0]
    current_width = img.shape[1]
    current_height = img.shape[2]
    # Compute depth factor
    depth = current_depth / desired_depth
    width = current_width / desired_width
    height = current_height / desired_height
    depth_factor = 1 / depth
    width_factor = 1 / width
    height_factor = 1 / height
    # Resize across z-axis
    img = ndimage.zoom(img, (depth_factor, width_factor, height_factor, 1))
    return img




def plot_slices(num_rows, num_cols, data):
    # create figure
    fig = plt.figure(figsize=(10, 7))
    
    n = len(data)
  
    for i in range(num_rows * num_cols):
        if i < n:
            # Adds a subplot at the 1st position
            fig.add_subplot(num_rows, num_cols, i+1)

            # showing image
            plt.imshow(data[i])
            plt.axis('off')
            
            
      
    
    
def prepare_3D_samples(main_dir, inp_lst):
    
    final_lst = []
    for i in tqdm(inp_lst):
        tmp_dir = os.path.join(main_dir,i)
        tmp_lst = os.listdir(tmp_dir)
        tmp_lst.sort()
        
        tmp_3d = []
        for j in tmp_lst:
            tmp_3d.append(plt.imread(os.path.join(main_dir,i,j)))
        
        tmp_3d = np.expand_dims(tmp_3d, axis = -1)
        final_lst.append(resize_volume(np.array(tmp_3d)))
    return np.array(final_lst)




def rotate(volume):
    """Rotate the volume by a few degrees"""

    def scipy_rotate(volume):
        # define some rotation angles
        angles = [-20, -10, -5, 0, 5, 10, 20]
        # pick angles at random
        angle = random.choice(angles)
        # rotate volume
        volume = ndimage.rotate(volume, angle, reshape=False)
        return volume
    
    augmented_volume = tf.numpy_function(scipy_rotate, [volume], tf.float32)
    return augmented_volume

In [None]:
cov_main_dir = '/kaggle/working/Patients/COVID'
norm_main_dir = '/kaggle/working/Patients/Normal'

cov_lst = os.listdir(cov_main_dir)
cov_lst.sort()
random.Random(1).shuffle(cov_lst)

norm_lst = os.listdir(norm_main_dir)
norm_lst.sort()
random.Random(1).shuffle(norm_lst)

print(len(cov_lst))
print(cov_lst[:5])
print(len(norm_lst))
print(norm_lst[:5])
print('_________')

covid_volume = prepare_3D_samples(cov_main_dir, cov_lst)
normal_volume = prepare_3D_samples(norm_main_dir, norm_lst)
print('_________')

print('covid_volume:', np.shape(covid_volume))
print('normal_volume:', np.shape(normal_volume))

In [None]:
normal_labels = np.array([1 for _ in range(len(normal_volume))])
covid_labels = np.array([0 for _ in range(len(covid_volume))])

In [None]:
x_train = np.concatenate((covid_volume[:76], normal_volume[:225]), axis=0)
y_train = np.concatenate((covid_labels[:76], normal_labels[:225]), axis=0)

x_valid = np.concatenate((covid_volume[76:], normal_volume[225:]), axis=0)
y_valid = np.concatenate((covid_labels[76:], normal_labels[225:]), axis=0)

In [None]:
print('x_train:', np.shape(x_train))
print('y_train:', np.shape(y_train))
print('x_valid:', np.shape(x_valid))
print('y_valid:', np.shape(y_valid))

In [None]:
# visualize a sample in x_train

print('label:',y_train[10])
print('label 1 is normal and label 0 is covid 19')

plot_slices(8,10,x_train[10])

In [None]:
def train_preprocessing(volume, label):
    # rescaling
    volume = tf.cast(volume, tf.float32) / 255.0
    # Rotate volume
    volume = rotate(volume)
    return volume, label


def validation_preprocessing(volume, label):
    # rescaling
    volume = tf.cast(volume, tf.float32) / 255.0
    return volume, label

In [None]:
# Define data loaders.
train_loader = tf.data.Dataset.from_tensor_slices((x_train, y_train))
validation_loader = tf.data.Dataset.from_tensor_slices((x_valid, y_valid))

batch_size = 2

# Augment the on the fly during training.
train_dataset = (
    train_loader.shuffle(len(x_train))
    .map(train_preprocessing)
    .batch(batch_size)
    .prefetch(1)
)

# Only rescale.
validation_dataset = (
    validation_loader.shuffle(len(x_valid))
    .map(validation_preprocessing)
    .batch(batch_size)
    .prefetch(1)
)


In [None]:
from keras.layers import Flatten, Activation, RepeatVector, Permute, Multiply, Lambda, Dense

def SE(inputs, ratio = 8):
    c = np.shape(inputs)[-1]    
    x = tf.keras.layers.GlobalAveragePooling3D()(inputs)   
    x = tf.keras.layers.Dense(c//ratio, activation='relu', use_bias=False)(x)
    x = tf.keras.layers.Dense(c, activation = 'sigmoid', use_bias = False)(x)    
    x = Multiply()([inputs, x])   
    return x

def attBlock(input_tensor):
    k = Dense(1)(input_tensor)
    q = Dense(1)(input_tensor)
    v = Dense(1)(input_tensor)
    alpha = tf.keras.layers.Activation('softmax')(Multiply()([k, q]))
    c = Multiply()([alpha, v])
    return c

def SE_resBlock(input_tensor, n_filters):
    x = tf.keras.layers.BatchNormalization()(input_tensor)
    x = tf.keras.activations.relu(x)
    x = tf.keras.layers.Conv3D(filters=n_filters , kernel_size=(3,3,3), padding='same')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.activations.relu(x)
    x = tf.keras.layers.Conv3D(filters=n_filters ,kernel_size=(3,3,3) , padding='same')(x)
    x = SE(x)
    output_tensor = x + tf.keras.layers.Conv3D(filters=n_filters , kernel_size=(1,1,1))(input_tensor)
    model = tf.keras.models.Model(input_tensor, output_tensor)
    return model, output_tensor

def model_arch():
    inp = tf.keras.Input((128,128,128,1))
    x = tf.keras.layers.Conv3D(filters=16 , kernel_size=(3,3,3), padding='same')(inp)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.activations.relu(x)
    x = tf.keras.layers.Conv3D(filters=16 , kernel_size=(3,3,3), padding='same')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.activations.relu(x)
    m, o = SE_resBlock(x, 32)
    m, o = SE_resBlock(o, 32)
    m, o = SE_resBlock(o, 32)
    x = tf.keras.layers.MaxPooling3D((7,7,7))(o)
    x1 = attBlock(x)
    x2 = attBlock(x)
    x3 = attBlock(x)
    x = tf.keras.layers.Concatenate()([x1, x2, x3])
    x = Flatten()(x)
    x = Dense(128, activation = 'relu')(x)
    x = Dense(16, activation = 'relu')(x)
    x = tf.keras.layers.Dropout(0.25)(x)
    x = Dense(1, activation = 'sigmoid')(x)
    
    return tf.keras.models.Model(inp, x)

In [None]:
MASERes = model_arch()
tf.keras.utils.plot_model(MASERes)

In [None]:
MASERes.summary()

In [None]:
MASERes = model_arch()
MASERes.compile(optimizer=tf.keras.optimizers.Adam(learning_rate = 1e-6),
                   loss='binary_crossentropy', metrics = ['acc'])

In [None]:
checkpoint_filepath = '/kaggle/working/'
model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_filepath,
    save_weights_only=True,
    monitor='val_accuracy',
    mode='max',
    save_best_only=True)

In [None]:
history = MASERes.fit(train_dataset,
                    validation_data = validation_dataset,
                    epochs = 100,
                    verbose = 1,
                    callbacks=[model_checkpoint_callback])

In [None]:
#plot acc and loss

MASERes.save('/kaggle/working/MASERes.h5')

epochs = range(len(history.history['loss']))
plt.plot(epochs , history.history['loss'], label='Training Loss')
plt.plot(epochs , history.history['val_loss'], label='Validation Loss')
plt.title('loss')
plt.legend()

plt.figure()

plt.plot(epochs , history.history['acc'], label='Training Accuracy')
plt.plot(epochs , history.history['val_acc'], label='Validation Accuracy')
plt.title('accuracy')
plt.legend()

plt.show()

In [None]:
MASERes.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-6) , 
                loss='binary_crossentropy', metrics = ['acc',
                                                      tf.keras.metrics.AUC(),
                                                      tf.keras.metrics.Precision(),
                                                      tf.keras.metrics.Recall(),
                                                      tf.keras.metrics.TruePositives(),
                                                      tf.keras.metrics.TrueNegatives(),
                                                      tf.keras.metrics.FalsePositives(),
                                                      tf.keras.metrics.FalseNegatives()])

In [None]:
a = MASERes.evaluate(validation_dataset)

In [None]:
conf_mx = [[a[6], a[7]],[a[8], a[5]]]
mx = np.array(conf_mx)
fscore = 2*a[3]*a[4]/(a[3]+a[4])
spc = mx[0, 0] * 1.0 / (mx[0, 0] + mx[0, 1])
sen = mx[1,1] * 1.0 / (mx[1,1] + mx[1,0])

print('accuracy:',np.round(a[1]*100,2),'%')
print('precision:',np.round(a[3]*100,2),'%')
print('recall:',np.round(a[4]*100,2),'%')
print('Sensitivity:',np.round(sen*100,2),'%')
print('Specificity:',np.round(spc*100,2),'%')
print('f1-score:',np.round(fscore*100,2),'%')
print('AUC:',np.round(a[2]*100,2),'%')

In [None]:
from IPython.display import FileLink
FileLink(r'/kaggle/working/MASERes.h5')