In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 5GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
import os

base_dir = '../input/covid19-image-dataset/Covid19-dataset'

train_dir = os.path.join(base_dir, 'train')
test_dir = os.path.join(base_dir, 'test')

In [None]:
train_lstnames = os.listdir(train_dir)
test_lstnames = os.listdir(test_dir)

In [None]:
train_lstnames

In [None]:
train_types_dir = []
for idx, type_name in enumerate(train_lstnames):
    type_dir = os.path.join(train_dir, type_name)
    train_types_dir.append(type_dir)

test_types_dir = [] 
for idx, type_name in enumerate(test_lstnames):
    type_dir = os.path.join(test_dir, type_name)
    test_types_dir.append(type_dir)

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

%matplotlib inline

In [None]:
number_per_type = {}

for idx, train_type_dir in enumerate(train_types_dir):
    num_spec = len(os.listdir(train_type_dir))
    type_name = train_lstnames[idx]
    number_per_type[type_name] = num_spec

In [None]:
type_number_df = pd.DataFrame(data=[number_per_type[i] for i in number_per_type.keys()], index=number_per_type.keys(), columns=['Quantity'])

In [None]:
type_number_df

In [None]:
type_number_df.plot(kind = 'bar', x = None, y = 'Quantity', figsize = (20, 10), title = 'Number of image for each type', fontsize = 20)

In [None]:
type_number_df.describe()

In [None]:
def generate_random_image(train_lstnames, number_random_types, number_random_per_types, train_types_dir, df):
    types_idx = np.random.choice(np.arange(len(train_lstnames)), number_random_types, replace = False)

    next_image = {}
    for idx in types_idx:
        label = train_lstnames[idx]
        lst_name = os.listdir(train_types_dir[idx])
        random_idxs = np.random.choice(int(df.loc[df.index.values[idx]].values), number_random_per_types, replace = False)
        choose_image = []
        for image_idx in random_idxs:
            img_name = os.path.join(train_types_dir[idx], lst_name[image_idx])
            choose_image.append(img_name)
        next_image[label] = choose_image
        
    return next_image

In [None]:
def plot_image(number_random_types, number_random_per_types, image_size, next_image, class_names = None, model  = None, get_prediction = False):
    fig = plt.gcf()

    nrows = number_random_types
    ncols = number_random_per_types

    fig.set_size_inches(image_size*nrows, image_size*ncols)
    
    if not get_prediction:
        model = None
        class_names = None
    else:
        inverse_class_names = {}
        for k, v in class_names.items():
            inverse_class_names[int(v)] = k
        
    count = 0
    for label, img_paths in next_image.items():
        for img_path in img_paths:
            sb = plt.subplot(nrows, ncols, count + 1)
            img = mpimg.imread(img_path)
            shape = img.shape
            sb.set_title(f'{label}, {shape}', color = 'r')
            
            if get_prediction:
                from tensorflow.keras.preprocessing.image import img_to_array
                import cv2
                new_img = cv2.resize(img, (224, 224))
                new_img = img_to_array(img)
                new_img = np.expand_dims(img, axis = 0)

                std_img = new_img / 255.0
                pred = model.predict(std_img)
                print('Prediction', np.argmax(pred))
                pred_name = inverse_class_names[int(np.argmax(pred))]
                sb.set_title(f'Predicted: {pred_name} \nGround True: {label}', color = 'r', fontsize = 10)
                
            sb.axis('Off')
            
            plt.imshow(img)
            count += 1
    plt.show()

In [None]:
import numpy as np
import matplotlib.image as mpimg

In [None]:
number_random_types = 3
number_random_per_types = 4

next_image = generate_random_image(train_lstnames, number_random_types, number_random_per_types, train_types_dir, type_number_df)
plot_image(number_random_types, number_random_per_types, 5, next_image)

In [None]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator

def get_train_generator(image_dir, shuffle=True, batch_size=8, seed=1, target_w = 320, target_h = 320):
    print("getting train generator...")
    image_generator = ImageDataGenerator(
        samplewise_center=True,
        samplewise_std_normalization= True, 
        shear_range=0.1,
        zoom_range=0.15,
        rotation_range=5,
        width_shift_range=0.1,
        height_shift_range=0.05,
        horizontal_flip=True, 
        vertical_flip = False, 
        fill_mode = 'reflect')
    
    generator = image_generator.flow_from_directory(
            directory= image_dir,
            class_mode="categorical",
            batch_size=batch_size,
            shuffle=shuffle,
            seed=seed,
            target_size=(target_w,target_h))
    
    return generator

In [None]:
def get_test_generator(train_image_dir, image_dir, sample_size=100, batch_size=8, seed=1, target_w = 320, target_h = 320):
    print("getting train and valid generators...")
    # get generator to sample dataset
    raw_train_generator = ImageDataGenerator().flow_from_directory(
        directory=train_image_dir,  
        class_mode="categorical", 
        batch_size=sample_size, 
        shuffle=True, 
        target_size=(target_w, target_h))
    
    # get data sample
    batch = raw_train_generator.next()
    data_sample = batch[0]

    # use sample to fit mean and std for test set generator
    image_generator = ImageDataGenerator(
        featurewise_center=True,
        featurewise_std_normalization= True)
    
    # fit generator to sample from training data
    image_generator.fit(data_sample)

    test_generator = image_generator.flow_from_directory(
            directory=image_dir,
            class_mode="categorical",
            batch_size=batch_size,
            shuffle=False,
            seed=seed,
            target_size=(target_w,target_h))
    
    return test_generator

In [None]:
IMAGE_SIZE=[320, 320]

EPOCHS = 20

VAL_BATCH_SIZE = 16

BATCH_SIZE = 64

In [None]:
train_generator = get_train_generator(
                                      image_dir = train_dir, 
                                      batch_size=BATCH_SIZE,
                                      target_w = IMAGE_SIZE[0], 
                                      target_h = IMAGE_SIZE[1] 
                                      )
test_generator= get_test_generator(train_image_dir = train_dir, 
                                                    image_dir = test_dir, 
                                                    batch_size = VAL_BATCH_SIZE,
                                                    target_w = IMAGE_SIZE[0], 
                                                    target_h = IMAGE_SIZE[1])

In [None]:
import tensorflow as tf
from tensorflow.keras.applications.densenet import DenseNet121
from tensorflow.keras.models import Model
from tensorflow.keras.models import load_model

In [None]:
input_shape = (320, 320, 3)

In [None]:
model = DenseNet121(input_shape=input_shape, weights='imagenet', include_top=False)
model.summary()

In [None]:
class_names = train_generator.class_indices

In [None]:
l_array = train_generator.labels
normal_freq = len(np.where(l_array == 1)[0]) / len(l_array)
covid_freq = len(np.where(l_array == 0)[0]) / len(l_array)
Pneumonia_freq = len(np.where(l_array == 2)[0]) / len(l_array)

In [None]:
for layer in model.layers:
    layer.trainable = False

In [None]:
from tensorflow.keras.layers import GlobalAveragePooling2D, Dropout, BatchNormalization, Activation, Dense

num_classes = len(train_lstnames)


x = GlobalAveragePooling2D()(model.output)

x = Dense(units = 512, kernel_initializer='he_normal')(x)
x = BatchNormalization(axis = -1)(x)
x = Activation('relu')(x)
x = Dropout(0.2)(x)

x = Dense(units = num_classes, activation = 'softmax')(x)

final_model = Model(inputs = [model.input], outputs = [x])

final_model.summary()

In [None]:
from tensorflow.keras.optimizers import Adam

final_model.compile(loss = 'categorical_crossentropy', optimizer=Adam(learning_rate=1e-3, amsgrad=False), metrics = ['accuracy'])

In [None]:
def build_lrfn(lr_start=0.000002, lr_max=0.00010, 
               lr_min=0, lr_rampup_epochs=8, 
               lr_sustain_epochs=0, lr_exp_decay=.8):

    def lrfn(epoch):
        if epoch < lr_rampup_epochs:
            lr = (lr_max - lr_start) / lr_rampup_epochs * epoch + lr_start
        elif epoch < lr_rampup_epochs + lr_sustain_epochs:
            lr = lr_max
        else:
            lr = (lr_max - lr_min) *\
                 lr_exp_decay**(epoch - lr_rampup_epochs\
                                - lr_sustain_epochs) + lr_min
        return lr
    return lrfn

lrfn = build_lrfn()
lr_schedule = tf.keras.callbacks.LearningRateScheduler(lrfn, verbose=True)

In [None]:
from tensorflow.keras.callbacks import LearningRateScheduler, ModelCheckpoint, EarlyStopping

def exponential_decay(lr0, s):
    def exponential_decay_fn(epoch):
        return lr0*0.1**(epoch / s)
    return exponential_decay_fn

exponential_decay_fn = exponential_decay(lr0=0.01, s = 10)
lr_scheduler = LearningRateScheduler(exponential_decay_fn)
model_checkpoint = ModelCheckpoint('my_checkpoint.h5', save_best_only=True)
early_stop = EarlyStopping(patience = 10, restore_best_weights=True)

class StopTraining(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs):
        if logs.get('val_accuracy') >= 0.95:
            print('Reach the desirable accuracy')
            self.model.stop_training = True
            
stopTrain = StopTraining()

In [None]:
history = final_model.fit(train_generator, epochs = 20, validation_data=test_generator, steps_per_epoch=251/64, 
                                    validation_steps=66/16, verbose = 1, callbacks=[lr_scheduler, model_checkpoint, early_stop, stopTrain], class_weight={0 : 0.8, 1:1, 2:1})

In [None]:
final_model.save('/kaggle/working/CovidNet.h5')

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
sns.set()
%matplotlib inline

acc = history.history['accuracy']
val_acc = history.history['val_accuracy']
loss = history.history['loss']
val_loss = history.history['val_loss']

epoch = history.epoch

plt.plot(epoch, acc, label = 'Training accuracy', color = 'r')
plt.plot(epoch, val_acc, label = 'Validation accuracy', color = 'b')
plt.title('Training and Validation Accuracy')
plt.legend()
plt.figure()


plt.plot(epoch, loss, label = 'Training loss', color = 'r')
plt.plot(epoch, val_loss, label = 'Validation loss', color = 'b')
plt.title('Training and Validation Loss')
plt.legend()

plt.show()

In [None]:
number_per_types_test = {}

for idx, test_type_dir in enumerate(test_types_dir):
    num_spec = len(os.listdir(test_type_dir))
    type_name = test_lstnames[idx]
    number_per_types_test[type_name] = num_spec
    
type_number_df_test = pd.DataFrame(data=[number_per_types_test[i] for i in number_per_types_test.keys()], index=number_per_types_test.keys(), columns=['Quantity'])

number_random_types = 3
number_random_per_kind = 2
image_size = 5

next_image = generate_random_image(test_lstnames, number_random_types, number_random_per_kind, test_types_dir, type_number_df_test)

plot_image(number_random_types, number_random_per_kind, image_size, next_image, class_names = class_names, model  = final_model, get_prediction = True)

In [None]:
import tensorflow as tf

new_model= tf.keras.models.load_model(filepath='/kaggle/working/CovidNet.h5')
converter = tf.lite.TFLiteConverter.from_keras_model(new_model)
tflite_model = converter.convert()
open("converted_model.tflite", "wb").write(tflite_model)