In [None]:
# Change to dataset path
# Find the dataset here: https://www.kaggle.com/datasets/jamilurrahman/covrecker2/data

In [2]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, models
from keras.layers import Conv2D, MaxPooling2D,GlobalAveragePooling2D
from keras.layers import Activation, Dropout, BatchNormalization, Flatten, Dense, AvgPool2D,MaxPool2D, GaussianNoise
from tensorflow.keras import Sequential, Model
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.optimizers import Adam
import os
import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import confusion_matrix, plot_confusion_matrix
import matplotlib.pyplot as plt
import pandas as pd
from tensorflow.keras.initializers import glorot_uniform
from tensorflow.keras.applications import EfficientNetB4, InceptionV3, MobileNetV2, ResNet50, VGG19, VGG16, Xception

In [None]:
shape = 350
epoch = 50
train_step = 10
val_step = int(train_step*0.6)
lr = 0.001
split = 0.2
batch = 48

In [None]:
def generate_data(df, data_dir):
    d = df.sample(frac=1).reset_index(drop=True)
    train_datagen = keras.preprocessing.image.ImageDataGenerator(
#                             horizontal_flip=True,
#                             vertical_flip=True,
#                             rotation_range=20,
#                             shear_range=20,
                            zoom_range=0.2,
#                             height_shift_range=0.3,
#                             width_shift_range=0.3,
                            validation_split=0.2
                        )

    train_imagegen = train_datagen.flow_from_dataframe(
                            d,
                            directory=data_dir,
                            x_col='path',
                            y_col='labels',
                            subset='training',
                            target_size=(shape, shape),
                            batch_size = 48,
#                             validate_filenames=False
                        )
    valid_datagen = keras.preprocessing.image.ImageDataGenerator(validation_split=0.2)

    valid_imagegen = valid_datagen.flow_from_dataframe(
                            d,
                            directory=data_dir,
                            x_col='path',
                            y_col='labels',
                            subset='validation',
                            target_size=(shape, shape),
                            batch_size = 32,
#                             validate_filenames=False
                        )
    return train_imagegen, valid_imagegen

In [None]:
def relu6(x):
    return min(max(0, x), 6)

In [None]:
def mb_block(x, expand=64, squeeze=16, strides=1, bneck_depth=3, se=False):

    m = tf.keras.layers.Conv2D(expand, (1,1), strides=1)(x)
    m = tf.keras.layers.BatchNormalization()(m)
    m = layers.LeakyReLU()(m)
    m = tf.keras.layers.DepthwiseConv2D(bneck_depth, padding='same', strides=strides)(m)
    m = tf.keras.layers.BatchNormalization()(m)
    m = layers.LeakyReLU()(m)
    if se:
        m = squeeze_excite_block(m, ratio=4)
    m = tf.keras.layers.Conv2D(squeeze, (1,1), strides=1, padding='same')(m)
    m = tf.keras.layers.BatchNormalization()(m)

    if (
      # stride check enforces that we don't add residuals when spatial
      # dimensions are None
        strides == 1 and
        # Depth matches
        m.get_shape().as_list()[3] == x.get_shape().as_list()[3]
      ):
        m = tf.keras.layers.Add()([m, x])

    return m



In [None]:
def bloc(top, num_filter):
    x = Conv2D(num_filter, (3,3), padding='same')(top)
    x = layers.LeakyReLU()(x)
    x = Conv2D(num_filter*2, (3,3), padding='same')(x)
    x = layers.LeakyReLU()(x)
    x = tf.keras.layers.DepthwiseConv2D(3, padding='same')(x)
    x = layers.LeakyReLU()(x)
    x = Conv2D(num_filter*2, (3,3), padding='same')(x)
    x = layers.LeakyReLU()(x)
    x = Conv2D(num_filter, (3,3), padding='same')(x)
    x = layers.LeakyReLU()(x)

    x = tf.keras.layers.Add()([x, top])

    return x

In [None]:
def create_model(count):
    inputs = keras.Input(shape=(shape, shape, 3))
    x = layers.Conv2D(16, 3, activation='relu', padding='same')(inputs)
    x = layers.Conv2D(32, 3, activation='relu', padding='same')(x)
    x = BatchNormalization()(x)
    x = layers.MaxPooling2D(2)(x)
    x = bloc(x,32)
    x = layers.MaxPooling2D(2)(x)
    x = bloc(x,32)
    x = layers.AvgPool2D(2)(x)
    x = bloc(x,32)
    x = layers.AvgPool2D(2)(x)
    x = bloc(x,32)
    x = layers.MaxPooling2D(2)(x)
    x = bloc(x,32)
    x = layers.MaxPooling2D(2)(x)
    x = BatchNormalization()(x)
    x = layers.Conv2D(32, 3, activation='relu', padding='same')(x)
    x = layers.Conv2D(16, 3, activation='relu', padding='same')(x)

    x = layers.Flatten()(x)
    x = layers.Dense(512)(x)
    x = layers.LeakyReLU()(x)
    x = layers.Dense(512)(x)
    x = layers.LeakyReLU()(x)
    x = layers.Dropout(0.5)(x)

    outputs = layers.Dense(count, activation='sigmoid')(x)

    model = keras.Model(inputs, outputs)

    model.compile(
        loss='categorical_crossentropy',
#             loss='sparse_categorical_crossentropy',
            optimizer='adam',
            metrics=['acc'])
    model.summary()

    return model

In [None]:
model = create_model(2)

In [None]:
# Change to actual dataset path
# Find the dataset here: https://www.kaggle.com/datasets/jamilurrahman/covrecker2/data

DIR_TARIN_PATH = '../input/covrecker2/new_dataset/train'
cov = []
vir = []
bac = []
nor = []

for x in os.listdir(os.path.join(DIR_TARIN_PATH)):
    if('BACTERIA' in x):
        bac.append(x)
    elif ('VIRAL' in x):
        vir.append(x)
    elif ('COVID' in x):
        cov.append(x)
    else:
        nor.append(x)

master = []
for x in range(len(cov)):
    if x <= 428:
        master.append(cov[x])

for x in range(len(vir)):
    if x <= 510:
        master.append(vir[x])

for x in range(len(bac)):
    if x <= 700:
        master.append(bac[x])

for x in range(len(nor)):
    if x <= 460:
        master.append(nor[x])
labels = []
for x in master:
    if('BACTERIA' in x):
        labels.append('PNEUMINIA_BACTERIA')
    elif ('VIRAL' in x):
        labels.append('PNEUMINIA_VIRAL')
    elif ('COVID' in x):
        labels.append('COVID')
    else:
        labels.append('NORMAL')
data = {
    'path': master,
    'labels': labels
}

df = pd.DataFrame(data)
csv = df.to_csv('4_CLASS.csv')

In [None]:
def train_model(model, train, val, types):
    early_stop = EarlyStopping(monitor = 'val_loss', min_delta = 0.001,
                           patience = 7, mode = 'min',
                           restore_best_weights = True)

    reduce_lr = ReduceLROnPlateau(monitor = 'val_loss', factor = 0.2,
                                  patience = 3, min_delta = 0.001,
                                 )
    checkpoint = ModelCheckpoint(
        os.path.join('models/model-{}.h5'.format(types)), monitor='val_loss', verbose=0, save_best_only=True,
    )

    history = model.fit(
        train,
        validation_data = val,
        epochs = epoch,
#         steps_per_epoch = 50,
        shuffle=True,
        workers = 4,
        use_multiprocessing=True,
        callbacks = [
#                      early_stop,
                     reduce_lr,
                     checkpoint
                    ],
    )

    return history

In [None]:
def plot_matric(history):
    acc = history.history['acc']
    loss = history.history['loss']
    val_acc = history.history['val_acc']
    val_loss = history.history['val_loss']

    epochs_range = range(len(acc))

    plt.figure(figsize=(8, 8))
    plt.subplot(1, 2, 1)
    plt.plot(epochs_range, acc, label='Training Accuracy')
    plt.plot(epochs_range, val_acc, label='Validation Accuracy')
    plt.legend(loc='lower right')
    plt.title('Training and Validation Accuracy')

    plt.subplot(1, 2, 2)
    plt.plot(epochs_range, loss, label='Training Loss')
    plt.plot(epochs_range, val_loss, label='Validation Loss')
    plt.legend(loc='upper right')
    plt.title('Training and Validation Loss')
    plt.show()

In [None]:
def generate_pred(model, images, true_label,labels):
    y_pred = []
    count = 0
    for file in images:
        img = keras.preprocessing.image.load_img(
            os.path.join('../input/covrecker2/new_dataset/test', str(file)), target_size=(shape, shape)
        )
        img_array = keras.preprocessing.image.img_to_array(img)
        img_array = tf.expand_dims(img_array, 0) # Create a batch

        predictions = model.predict(img_array)
        score = tf.nn.softmax(predictions[0])
        y_pred.append(labels[np.argmax(score)])

        print(
            "{} - Most likely {} with a {:.2f} percent confidence."
            .format(count+1, labels[np.argmax(score)], 100 * np.max(score))

        )
        print(file, '\n')
    return true_label, y_pred

In [None]:
import seaborn as sn
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix


def plt_conf(labels, tr, pr):
    conf_2 = confusion_matrix(tr, pr)
    df_cm = pd.DataFrame(conf_2, index = [i for i in labels],
                      columns = [i for i in labels])
    sn.set(font_scale=1.1)
    plt.figure(figsize = (10, 7))
    sn.heatmap(df_cm, annot=True)

In [None]:
class_2_df = pd.read_csv('../input/covrecker2/new_dataset/csv/2_CLASS.csv')
class_3_df = pd.read_csv('../input/ct-data-x/3_CLASS_CT.csv')
class_4_df = pd.read_csv('./4_CLASS.csv')


# class_2_df.groupby('labels').count()


In [None]:
class_3_df.groupby('labels').count()

In [None]:
class_4_df.groupby('labels').count()

# RUN ALL TILL HERE

In [None]:
train_2, val_2 = generate_data(class_2_df, '../input/covrecker2/new_dataset/train')

In [None]:
# test_df = pd.read_csv('../input/ct-data-x/2_CLASS_CT_TEST.csv')

# test_df.head()

In [None]:
# fig = plt.figure()
# ax = fig.add_axes([0,0,1,1])
# ax.bar(['COVID', 'NORMAL'],[1200, 1341])

In [None]:
model_2 = create_model(2)

In [None]:
history_2 = train_model(model_2, train_2, val_2, 2 )

In [None]:
test_df_2 = pd.read_csv('../input/covrecker2/new_dataset/csv/test.csv')
true_2, pred_2 = generate_pred(model_2, test_df_2['path'][:29].values, test_df_2['lable'][:29].values,['COVID', 'NORMAL'], )

In [None]:
print(true_2, pred_2)

In [None]:
plt_conf(['COVID', 'NORMAL'], true_2, pred_2)

In [None]:
for file in os.listdir(os.path.join('../input/expdata')):
    model = tf.keras.models.load_model('./models/model-2.h5')
    labels = ['COVID', 'NORMAL']
    img = keras.preprocessing.image.load_img(
            os.path.join('../input/expdata', str(file)), target_size=(shape, shape)
        )
    img_array = keras.preprocessing.image.img_to_array(img)
    img_array = tf.expand_dims(img_array, 0) # Create a batch

    predictions = model.predict(img_array)
    score = tf.nn.softmax(predictions[0])
    print(labels[np.argmax(score)])

In [None]:
!tensorflowjs_converter --input_format keras \
                       ./models/model-2.h5 \
                       ./models --weight_shard_size_bytes 250000000

In [None]:
import tensorflowjs as tfjs
model = tf.keras.models.load_model('./models/model-2.h5')
tfjs.converters.save_keras_model(model, os.path.join('models'))

In [None]:
from sklearn.metrics import confusion_matrix
x = confusion_matrix(true_2, pred_2, labels=['COVID', 'NORMAL']).ravel()
TN = x[0]
FP = x[1]
FN = x[2]
TP = x[3]

sp = TN/(TN+FP)
sn = TP/(TP+FN) # RECALL
acc = (TP+TN)/(TP+TN+FP+FN)
pr = TP/ (TP +FP)
f1 = 2*(pr * sn) / (pr+ sn)

print('Specificity ', sp, '\n', 'Sencitivity ', sn, '\n', 'Accurecy ', acc, '\n', 'Percision ', pr, '\n', 'F1 ', f1, )

In [None]:
with open('report.txt','w') as fh:
    # Pass the file handle in as a lambda function to make it callable
    model_2._name = 'CoroPy'
    for layer in model_2.layers:
        layer._name = layer._name + '_coro_py'
    model_2.summary(print_fn=lambda x: fh.write(x + '\n'))

In [None]:
plot_matric(history_2)

In [None]:
model_3 = create_model(3)
train_3, val_3 = generate_data(class_3_df, '../input/ct-data-x/CTTRAIN')

In [None]:
history_3 = train_model(model_3, train_3, val_3 , 3)

In [None]:
test_df_3 = pd.read_csv('../input/ct-data-x/3_CLASS_CT_TEST.csv')

true_3, pred_3 = generate_pred(model_3, test_df_3['path'].values, test_df_3['labels'].values,['COVID', 'NORMAL', 'VIRAL_PNEUMONIA'], )

In [None]:
print(true_3, pred_3)

In [None]:
plt_conf(['COVID', 'NORMAL', 'VIRAL_PNEUMONIA'], true_3, pred_3)

In [None]:
from sklearn.metrics import confusion_matrix
x = confusion_matrix(true_3, pred_3, labels=['COVID', 'NORMAL', 'VIRAL_PNEUMONIA']).ravel()
TN = x[0]
FP = x[1]
FN = x[2]
TP = x[3]
sp = TN/(TN+FP)
sn = TP/(TP+FN) # RECALL
acc = (TP+TN)/(TP+TN+FP+FN)
pr = TP/ (TP +FP)
f1 = 2*(pr * sn) / (pr+ sn)

print('Specificity ', sp, '\n', 'Sencitivity ', sn, '\n', 'Accurecy ', acc, '\n', 'Percision ', pr, '\n', 'F1 ', f1, )

In [None]:
plot_matric(history_3)

In [None]:
model_4 = create_model(4)

In [None]:
train_4, val_4 = generate_data(class_4_df, '../input/covrecker2/new_dataset/train')

In [None]:
history_4 = train_model(model_4, train_4, val_4, 4 )

In [None]:
md = tf.keras.models.load_model('./models/model-4.h5')
# md = model_4
test_df_4 = pd.read_csv('../input/test-csv/test.csv')
true_4, pred_4 = generate_pred(md, test_df_4['path'].values, test_df_4['labels'].values, ["COVID", "NORMAL", "PNEUMONIA_VIRAL", "PNEUMONIA_BACTERIA"])

In [None]:
print(true_4)

In [None]:
plt_conf(['COVID', 'NORMAL', 'VIRAL_PNEUMONIA','BACTERIAL_PNEUMONIA'], true_4, pred_4)

In [None]:
acc = history_4.history['acc']
val_acc = history_4.history['val_acc']

print('Max ACC, ', max(acc)*100, '% ', '\nVAL ACC, ', max(val_acc)*100, '%')

In [None]:
plot_matric(history_4)

# KFOLD

In [None]:
from sklearn.metrics import confusion_matrix

def get_metrics(true, pred, cols):
    x = confusion_matrix(true, pred, labels=cols).ravel()
    TN = x[0]
    FP = x[1]
    FN = x[2]
    TP = x[3]
    sp = TN/(TN+FP)
    sn = TP/(TP+FN) # RECALL
    acc = (TP+TN)/(TP+TN+FP+FN)
    pr = TP/ (TP +FP)
    f1 = 2*(pr * sn) / (pr+ sn)

    print('Specificity ', sp, '\n', 'Sencitivity ', sn, '\n', 'Accurecy ', acc, '\n', 'Percision ', pr, '\n', 'F1 ', f1, )

In [None]:
def generate_data_2(tr, val, data_dir):
    train_datagen = keras.preprocessing.image.ImageDataGenerator(
                            horizontal_flip=True,
                            vertical_flip=True,
                            rotation_range=20,
                            shear_range=20,
                            zoom_range=0.2,
                            height_shift_range=0.3,
                            width_shift_range=0.3,
                        )

    train_imagegen = train_datagen.flow_from_dataframe(
                            tr,
                            directory=data_dir,
                            x_col='path',
                            y_col='labels',
                            target_size=(shape, shape),
                            batch_size = 32,
#                             validate_filenames=False
                        )
    valid_datagen = keras.preprocessing.image.ImageDataGenerator()

    valid_imagegen = valid_datagen.flow_from_dataframe(
                            val,
                            directory=data_dir,
                            x_col='path',
                            y_col='labels',
                            target_size=(shape, shape),
                            batch_size = 32,
#                             validate_filenames=False
                        )
    return train_imagegen, valid_imagegen

In [None]:
from sklearn.model_selection import KFold

fold = KFold(n_splits=5, shuffle=True, random_state=1)
train_index = []
vald_index = []
df = class_2_df.sample(frac=1).reset_index(drop=True)
for tr_index, val_index in fold.split(df):
    train_index.append(tr_index)
    vald_index.append(val_index)

# Fold 1 2 CLass

In [None]:
print('FOLD 2')
train = df.iloc[train_index[4]]

val = df.iloc[vald_index[4]]
#     data = pd.concat([train, val], ignore_index=True)
train_data, val_data = generate_data_2(train, val, '../input/ct-data-x/CTTRAIN')
# model_2_f1 = create_model(2)
model_2_f1 = tf.keras.models.load_model('./models/model-2.h5')

history_2_f1 = train_model(model_2_f1, train_data, val_data, 2 )

In [None]:
# history = []
# for i in range(len(train_index)):
#     print('fold ', i)

#     train = df.iloc[train_index[i]]

#     val = df.iloc[vald_index[i]]
# #     data = pd.concat([train, val], ignore_index=True)
#     train_data, val_data = generate_data_2(train, val, '../input/covrecker2/new_dataset/train')
#     model_2 = create_model(2)

#     history_2 = train_model(model_2, train_data, val_data, 2 )
#     history.append(history_2)




In [None]:
test_df_2 = pd.read_csv('../input/ct-data-x/2_CLASS_CT_TEST.csv')
true_2, pred_2 = generate_pred(model_2_f1, test_df_2['path'].values, test_df_2['labels'].values,['COVID', 'NORMAL', ], )

In [None]:
plt_conf(['COVID', 'NORMAL',], true_2, pred_2)

In [None]:
# for i in history:
plot_matric(history_2_f1)

In [None]:
get_metrics(true_2, pred_2, ['COVID', 'NORMAL',])

In [None]:
# from sklearn.model_selection import KFold

# train_index = []
# vald_index = []
# fold = KFold(n_splits=5, shuffle=True, random_state=1)
# df_3 = class_3_df.sample(frac=1).reset_index(drop=True)
# for tr_index, val_index in fold.split(df_3):
#     train_index.append(tr_index)
#     vald_index.append(val_index)

In [None]:
# history_3 = []
# for i in range(len(train_index)):
#     print('fold ', i)

#     train = df_3.iloc[train_index[i]]
#     val = df_3.iloc[vald_index[i]]
# #     data = pd.concat([train, val], ignore_index=True)
#     train_data, val_data = generate_data_2(train, val, '../input/covrecker2/new_dataset/train')
#     model_3 = create_model(3)
#     history = train_model(model_3, train_data, val_data, 3 )
#     history_3.append(history)

In [None]:
# model_3 = tf.keras.models.load_model('../input/model33/model-3.h5')
# test_df_3 = pd.read_csv('../input/test-csv/test.csv')

# true_3, pred_3 = generate_pred(model_3, test_df_3[:44]['path'].values, test_df_3[:44]['labels'].values,['COVID', 'NORMAL', 'PNEUMONIA_VIRAL'], )

In [None]:
# plt_conf(['COVID', 'NORMAL', 'PNEUMONIA_VIRAL'], true_3, pred_3)

In [None]:
# for i in history_3:
#     plot_matric(i)

In [None]:
from sklearn.model_selection import KFold

train_index = []
vald_index = []
fold = KFold(n_splits=5, shuffle=True, random_state=1)
df_4 = class_4_df.sample(frac=1).reset_index(drop=True)
for tr_index, val_index in fold.split(df_4):
    train_index.append(tr_index)
    vald_index.append(val_index)

In [None]:
history_4 = []
for i in range(len(train_index)):
    print('fold ', i)
    train = df_4.iloc[train_index[i]]
    val = df_4.iloc[vald_index[i]]
#     data = pd.concat([train, val], ignore_index=True)
    train_data, val_data = generate_data_2(train, val, '../input/covrecker2/new_dataset/train')
    model_4 = create_model(4)
    history = train_model(model_4, train_data, val_data, 4 )
    history_4.append(history)
    model_4 = tf.keras.models.load_model('../input/model44/model-4.h5')
    test_df_4 = pd.read_csv('../input/test-csv/test.csv')

    true_4, pred_4 = generate_pred(model_4, test_df_4[:59]['path'].values, test_df_4[:59]['labels'].values, ["COVID", "NORMAL", "PNEUMONIA_VIRAL", "PNEUMONIA_BACTERIA"])
    plt_conf(['COVID', 'NORMAL', 'PNEUMONIA_VIRIAL','PNEUMONIA_BACTERIA'], true_4, pred_4)

In [None]:
# plt_conf(['COVID', 'NORMAL', 'PNEUMONIA_VIRIAL','PNEUMONIA_BACTERIA'], true_4, pred_4)

In [None]:
for i in history_4:
    plot_matric(i)