# Transfer learning for COVID-19 Detection in X-Ray Images
## DD2424 Deep Learning - Group Project 

### Read Data

In [1]:
import matplotlib.pyplot as plt
import glob
import os
import cv2
import numpy as np

#### Filter difference between current generated dataset and paper dataset to get the correct data

In [4]:
def get_file_lines(filename):
    with open(filename) as f:
        content = f.readlines()
        
    file_lines = [x.strip() for x in content]
    return file_lines

def filter_out_difference(correct_data_description, wrong_data_description):
    correct_file_lines = get_file_lines(correct_data_description)
    wrong_file_lines = get_file_lines(wrong_data_description)
    
    nr_missing_images = 0
    
    # Need to check substrings as wrong_file_description has longer lines
    for correct_line in correct_file_lines:
        if not any(correct_line in line for line in wrong_file_lines):
            nr_missing_images += 1
                
    if nr_missing_images > 0:
        print('you are missing: ', nr_missing_images, 'images')
        return
    print('you have all images in ', correct_data_description)
    
    new_correct_upd_lines = []
    
    for wrong_line in wrong_file_lines:
        for correct_line in correct_file_lines:
            if correct_line in wrong_line:
                new_correct_upd_lines.append(wrong_line)
                   
    return new_correct_upd_lines
    
def write_line_list_to_file(file, line_list):
    with open(file, 'w') as filehandle:
        filehandle.writelines("%s\n" % line for line in line_list)
    


In [None]:
paper_test_data_description = 'data/paper_dataset_specifications/test_COVIDx2.txt'
paper_train_data_description = 'data/paper_dataset_specifications/train_COVIDx2.txt'

current_test_data_description = 'data/test_split_v3.txt'
current_train_data_description = 'data/train_split_v3.txt'

upd_test_set_list = filter_out_difference(paper_test_data_description, current_test_data_description)
upd_train_set_list = filter_out_difference(paper_train_data_description, current_train_data_description)


In [None]:
# writing updated and correct dataset description to txt file
# write_line_list_to_file('data/correct_test_split.txt',upd_test_set_list)
# write_line_list_to_file('data/correct_train_split.txt',upd_train_set_list)

In [None]:
# Moving images to class partitioned directory
mapping = {
            'normal': 0,
            'pneumonia': 1,
            'COVID-19': 2}

def get_label(img_desc_list):
        for class_name in mapping:
            if class_name in img_desc_list:
                return mapping[class_name]

current_dir = 'data/dataset/excess_train'
new_dir = 'data/dataset/val/class'
img_descriptions = get_file_lines('data/old_wrong_train_split_v3.txt')


data_path = os.path.join(current_dir, '*g')
images = glob.glob(data_path)

for image_path in images:
            image_name = image_path.replace(current_dir + '/', '')
            for img_desc in img_descriptions:
                if image_name in img_desc:
                    img_desc_list = img_desc.split()
                    label = get_label(img_desc_list)
                    os.rename(image_path, new_dir +str(label) + '/' + image_name)

### Class for handling data

In [2]:
# Much of this class in unnecessary now adays
class Dataset:
    def __init__(
            self,
            test_img_dir,
            train_img_dir,
            test_img_descriptions_file,
            train_img_descriptions_file,
            input_shape = (224, 224),
            batch_size = 10
                 ):
        self.test_img_dir = test_img_dir
        self.train_img_dir = train_img_dir
        self.test_img_descriptions = get_file_lines(test_img_descriptions_file)
        self.train_img_descriptions = get_file_lines(train_img_descriptions_file)
        self.input_shape = input_shape
        self.batch_size = batch_size
        self.batch_nr = 1
        self.max_batch = len(self.train_img_descriptions) // self.batch_size
        self.mapping = {
                'normal': 0,
                'pneumonia': 1,
                'COVID-19': 2}

        self.y_train = None
        self.y_test = None
        self.x_batch = None
        self.y_batch = None

    
    def get_current_batch(self):
        return self.x_batch, self.y_batch
    
    def _get_class_dist(self, y):
        class_dist = {}
        for class_name in self.mapping:
            class_dist[class_name] = np.count_nonzero(y == self.mapping[class_name])
        return class_dist

    def get_test_class_dist(self):
        test_class_dist = self._get_class_dist(self.y_test)

        return test_class_dist

    def read_test_data(self):
        self.x_test, self.y_test = self._read_correct_images(self.test_img_dir, self.test_img_descriptions)

    def _get_label(self, img_desc_list):
        for class_name in self.mapping:
            if class_name in img_desc_list:
                return self.mapping[class_name]
            
    # Must load training data in batches since memory error otherwise    
    def next_train_batch(self):
        
        if self.batch_nr == self.max_batch:
            print('No data left')
            return [], []
        
        start_img = (self.batch_nr-1) * self.batch_size
        end_img = (self.batch_nr) * self.batch_size        
                
        batch_descriptions = self.train_img_descriptions[start_img: end_img]
        
        self.x_batch, self.y_batch = self._read_correct_images(self.train_img_dir, batch_descriptions)
        
        self.batch_nr += 1
        

    def _read_correct_images(self, img_dir, img_descriptions):
        data_path = os.path.join(img_dir, '*g')
        images = glob.glob(data_path)
        
        x_list = []
        y_list = []
        
        for image_path in images:
            image_name = image_path.replace(img_dir + '/', '')
            for img_desc in img_descriptions:
                if image_name in img_desc:
                    img_desc_list = img_desc.split()
                    label = self._get_label(img_desc_list)
                    y_list.append(label)

                    img_array = cv2.imread(image_path)
                    resized_img_array = cv2.resize(img_array, self.input_shape)
                    x_list.append(resized_img_array)
                    
        y_array = np.array(y_list)
        x_array = np.array(x_list)        

        return x_array, y_array

### Loading data

In [5]:
test_data = 'data/dataset/test'
test_data_description = 'data/correct_test_split.txt'

train_data = 'data/dataset/train'
train_data_description = 'data/correct_train_split.txt'

dataset = Dataset(
        test_data,
        train_data,
        test_data_description,
        train_data_description
       )

# Read test data
dataset.read_test_data()

print(dataset.get_test_class_dist())

{'normal': 100, 'pneumonia': 100, 'COVID-19': 31}


In [8]:
x_train.shape

(10, 224, 224, 3)

### Proof of concept with simple ML model

### Pretrained VGG-16

#### Importing images from folders 

In [None]:
""" Image import: This was mainly to see if I could get the model working, ImageDataGenerator might not be the best choice.
However, if we want to do data augmentation later, these function are 
built-in here. To run the script, you need 3 folder: dataset/data/train, dataset/data/val, dataset/data/test. The first two should have 
three subfolders each: class1, class2, class3 with corresponding pictures"""

import tensorflow
from tensorflow import keras
from keras import models, layers, optimizers
from keras.applications.vgg16 import preprocess_input, VGG16
from keras.preprocessing.image import ImageDataGenerator

# folders
train_dir = 'data/dataset/johanna_testdata/train'
val_dir = 'data/dataset/johanna_testdata/val'
test_dir = 'data/dataset/johanna_testdata/test'

# constants
img_size = 150
batch_size = 16
epochs = 6
nb_train_samples = len(next(os.walk(train_dir))[2])
nb_val_samples = len(next(os.walk(val_dir))[2])
print("training samples:", nb_train_samples)
print("validation samples:", nb_val_samples)

datagen = ImageDataGenerator()
train_generator = datagen.flow_from_directory(
        train_dir,
        target_size=(img_size, img_size),
        batch_size=batch_size,
        class_mode='categorical')

val_generator = datagen.flow_from_directory(
        val_dir,
        target_size=(img_size, img_size),
        batch_size=batch_size,
        class_mode='categorical')

test_generator = datagen.flow_from_directory(
        test_dir,
        target_size=(img_size, img_size),
        batch_size=batch_size,
        class_mode='categorical',
        shuffle='false')

#### Training and testing model with frozen weights

In [None]:
print("Freezing weights and updating last layers")
vgg_conv = VGG16(weights='imagenet', include_top=False, input_shape=(img_size, img_size, 3))

#create model and freeze all but last conv block
vgg_conv = VGG16(weights='imagenet', include_top=False, input_shape=(img_size, img_size, 3))
for layer in vgg_conv.layers[:-4]:
    layer.trainable = False

# add VGG network and extra layer to model
model = models.Sequential() 
model.add(vgg_conv)
model.add(layers.Flatten())
model.add(layers.Dense(1024, activation='relu'))
model.add(layers.Dropout(0.5))
model.add(layers.Dense(3, activation='softmax'))

# summerize and compile
model.summary()
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

# fine-tune model
history = model.fit_generator(
    train_generator,
    steps_per_epoch=nb_train_samples // batch_size,
    epochs=epochs,
    validation_data=val_generator,
    validation_steps=nb_val_samples // batch_size)

# Save model and plot accuracy 
#model.save('transfer_model.h5')
acc = history.history['accuracy']
val_acc = history.history['val_accuracy']
eps = range(len(acc))
plt.plot(eps, acc, 'b', label='Training acc')
plt.plot(eps, val_acc, 'r', label='Validation acc')
plt.title('Training and validation accuracy')
plt.legend()
plt.figure()
plt.show()

pred=model.predict_generator(test_generator)
print(pred)

#### Training and testing model with no frozen weights

In [None]:
# create vgg
vgg_conv_2 = VGG16(weights='imagenet', include_top=False, input_shape=(img_size, img_size, 3))

# add vgg network and extra layer to model
model_2 = models.Sequential() 
model_2.add(vgg_conv_2)
model_2.add(layers.Flatten())
model_2.add(layers.Dense(1024, activation='relu'))
model_2.add(layers.Dropout(0.5))
model_2.add(layers.Dense(3, activation='softmax'))

#summerize and compile
model_2.summary()
model_2.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
history_2 = model_2.fit_generator(
    train_generator,
    steps_per_epoch=nb_train_samples // batch_size,
    epochs=epochs,
    validation_data=val_generator,
    validation_steps=nb_val_samples // batch_size)

# Save the model
#model_2.save('non_transfer_model.h5')
acc = history_2.history['accuracy']
val_acc = history_2.history['val_accuracy']
eps = range(len(acc))
plt.plot(eps, acc, 'b', label='Training acc')
plt.plot(eps, val_acc, 'r', label='Validation acc')
plt.title('Training and validation accuracy')
plt.legend()
plt.figure()
plt.show()

pred_2=model.predict_generator(test_generator)
print(pred)

### CNN Model from Scratch