Filip Jurić <fijur20@student.sdu.dk>

Lucas Olai Jarlkov Olsen <luols17@student.sdu.dk>

Martin Kristian Lorenzen <marlo17@student.sdu.dk>

Sebastian Eklund Larsen <selar16@student.sdu.dk>

Youssouf Souare <yosou20@student.sdu.dk>


# Deep Learning Project 2


## Task 0 - Data prep

Download and extract dataset.

In [None]:
"""
DATASET DOWNLOAD
"""
import os.path

DATASET = "pneumonia_project_2020_2.zip"
URL = "https://nextcloud.sdu.dk/index.php/s/s7bA69T4Stog6wo/download"

# Download if it does not exist
if not os.path.isfile(DATASET):
    import requests
    print("Downloading file...")
    downfile = requests.get(URL)
    open(DATASET, 'wb').write(downfile.content)
    del downfile
else:
    print("Skipping download...")

print("Done!")

In [None]:
"""
OVERLY COMPLICATED DATASET EXTRACTION
"""
EXTRACT = False # Manual toggle for when rerunning
RANDOM_SEED = 42

DATA_FOLDERS = ["training", "evaluation"]

if EXTRACT:
    import os
    import shutil
    from zipfile import ZipFile

    print("Extracing dataset...")

    # Remove any previous extractions
    for dir in DATA_FOLDERS:
        try:
            rmtree(dir)
        except: pass

    with ZipFile(DATASET, 'r') as zip:
        for info in zip.infolist():
            if info.filename[-1] == '/': # Skip directories
                continue
            if info.filename.endswith(".pdf"): # Skip PDFs
                continue 
            # Identify if Filtered or Encoded
            folder = "other"
            if info.filename.startswith("project_2020/filtered/"):
                folder = "training"
            elif info.filename.startswith("project_2020/encoded"):
                folder = "evaluation"
            # Read label (PNEUMONIA/NORMAL)
            label = info.filename.split('/')[-2]
            # Extraction magic
            info.filename = os.path.basename(info.filename)
            zip.extract(info, f"{folder}/{label}")
else: 
    print("Skiping dataset extraction...")
print("Done!")

In [None]:
"""
Import Tensorflow
"""

import tensorflow as tf

## Task 1 - Layers

To create your model, you will need different layers. You are forbidden fromusing the built in dense, convolution, and pooling layers. Instead, you mustimplement these 3 layers on your own. You can use the code you have developed for class. A good place to start off is always the Keras documentation. [You can view the documentation on custom layers by clicking HERE](https://keras.io/layers/writing-your-own-keras-layers/).

You must present three classes, Dense, Convolution2D, and Pooling.

You can add additional parameters to these layers as you might need them,such as stride, kernel size, different types of pooling such as average pooling,max pooling, etc.

You are allowed to use all backend functions like `K.dot(x, y)` and `K.conv2d(x, kernel)` etc.

In [None]:
"""
Layer Imports
"""
import math

from tensorflow.keras import Model
from tensorflow.keras import activations
from tensorflow.keras import backend as K
from tensorflow.keras.layers import Layer
import numpy as np

### Dense Layer

In [None]:
"""
Dense Layer Implementation
"""

#might want to change name to something unique
class CustomDense(Layer):
    def __init__(self, units, activation=None, **kwargs):
        self.units = units
        self.activation = activations.get(activation)
        super(CustomDense, self).__init__(**kwargs)
        
    def build(self, input_shape):
        self.w = self.add_weight(name="weights",shape=(input_shape[1], self.units),
                                 initializer="uniform",
                                 trainable=True)
        self.b = self.add_weight(name="bias",shape=(self.units,), initializer="zeros", trainable=True)
        super(CustomDense, self).build(input_shape)

    def compute_output_shape(self, input_shape):
        return(input_shape[0], self.units)
    
    def call(self, x):
        return self.activation(K.dot(x, self.w) + self.b)
    
    def get_config(self):
        config = super(CustomDense, self).get_config()
        config.update({"units": self.units, "activation":self.activation})
        return config
    

### Convolution Layer

In [None]:
"""
Convolution Layer Implementation Goes Here
"""


class Convolution2D(Layer):
    def __init__(self, filters, kernel_size, strides=(1, 1), activation=None, padding='valid', **kwargs):
        self.filters = filters
        self.kernel_size = kernel_size
        self.strides = strides
        self.activation = keras.activations.get(activation)
        self.padding = padding
        self.input_spec = keras.layers.InputSpec(ndim=4)
        super(Convolution2D, self).__init__(**kwargs)

    def build(self, input_shape):
        # Create a trainable weight variable for this layer.
        channel_amount = int(input_shape[-1])
        self.kernel = self.add_weight(name='kernel',
                                      shape=(*self.kernel_size, channel_amount, self.filters),
                                      initializer='glorot_uniform')
        super(Convolution2D, self).build(input_shape)  # Be sure to call this at the end

    def call(self, inputs, **kwargs):
        # Do the convolution calculation
        return self.activation(K.conv2d(inputs, self.kernel, self.strides, self.padding))

    def compute_output_shape(self, input_shape):
        # Determine how to compute the output shape
        return *input_shape[:-1], self.filters
    
    def get_config(self):
        config = super(Convolution2D, self).get_config()

        return {
            **config, **{
                "filters": self.filters,
                "kernel_size": self.kernel_size,
                "strides": self.strides,
                "activation": self.activation,
                "padding": self.padding,
            }
        }

    @classmethod
    def from_config(cls, config):
        layer = cls(**config)
        layer.kernel = config['kernel']
        return layer

### Pooling Layer

In [None]:
""""
Pooling Layer Implementation
"""
from tensorflow.keras import backend as K
from tensorflow.keras.layers import *
from tensorflow.keras.models import Model,load_model

class Pooling2d(Layer):
    def __init__(self, pool_size=(2,2), strides=None, padding="valid", pool_mode="max", **kwargs):
        self.pool_size = pool_size
        self.padding = padding
        self.pool_mode = pool_mode
        if strides == None:
            self.strides = pool_size
        else:
            self.strides = strides
        
        super(Pooling2d, self).__init__(**kwargs)
        
    def build(self, input_shape):
        super(Pooling2d, self).build(input_shape)

    def compute_output_shape(self, input_shape): #assumes channels last
        col = int(input_shape[2])
        row = int(input_shape[1])
        if self.padding == "same":
            out_rows = int(math.floor((row - 1) / self.strides[0]) + 1)
            out_cols = int(math.floor((col - 1) / self.strides[1]) + 1)
            return tf.TensorShape( (input_shape[0], out_rows, out_cols, input_shape[3]))
        
        else: # padding == valid
            out_rows = int(math.floor((row - self.pool_size[0]) / self.strides[0]) + 1)
            out_cols = int(math.floor((col - self.pool_size[1]) / self.strides[1]) + 1)
            
            return tf.TensorShape((input_shape[0], out_rows, out_cols, input_shape[3]))
            
    def call(self, inp):
        output = K.pool2d(inp,self.pool_size, self.strides, self.padding, pool_mode = self.pool_mode)
        return output
    
    def get_config(self):
        config = super(CustomLayer, self).get_config()
        config.update({"pool_size": self.poolsize,"strides" : self.strides, "padding": self.padding, "pool_mode": self.pool_mode})
        return config


## Task 2 - Model

You must now build your model.  To do this, use your own layers, as well as theother layers from the Keras library except  the  three forbidden layers (Dense,Convolution2D, pooling).  This means you can use for example Flatten from theKeras library.

You must now build your model. To do this, use your own layers, as well as theother layers from the Keras library except the three forbidden layers (Dense,Convolution2D, pooling). This means you can use for example Flatten from theKeras library.

Try to make your model as good as possible. A quick list of things you couldtry:
* Adding different kinds of regularization
* Adding different kinds of data augmentation
* Varying your model parameters such as kernel size, amount of units, acti-vation functions
* The structure of your model. Make it deeper and slimmer, make it moreshallow but wider

Especially the last suggestion can give large gains. Here is a couple papersyou can read, if you want to know the state-of-the-art networks:

* [InceptionV33](https://arxiv.org/abs/1512.00567)
* [DenseNet](https://arxiv.org/abs/1608.06993)

It is important that you resize your images to 224x224, in order for yournetwork to work for task 3.



In [None]:
"""
Build Datagenerator
"""
IMG_WIDTH = 224
IMG_HEIGHT = 224
IMG_CHANNELS = 1
IMG_SIZE = (IMG_WIDTH, IMG_HEIGHT)
INPUT_SHAPE = (IMG_WIDTH, IMG_HEIGHT, IMG_CHANNELS)


VALIDATION_SPLIT = 0.2


train_dataset = tf.keras.preprocessing.image_dataset_from_directory(
    "training/",
    label_mode="binary",
    color_mode="grayscale",
    image_size=IMG_SIZE,
    seed=RANDOM_SEED,
    validation_split=VALIDATION_SPLIT,
    subset="training"
)


validation_dataset = tf.keras.preprocessing.image_dataset_from_directory(
    "training/",
    label_mode="binary",
    color_mode="grayscale",
    image_size=IMG_SIZE,
    seed=RANDOM_SEED,
    validation_split=VALIDATION_SPLIT,
    subset="validation"
)

In [None]:
"""
Build & Compile Model
"""

INITIAL_EPOCH=0

# TODO (2) Switch to our custom layers (Dense, Convolution & Pooling)

model = tf.keras.models.Sequential([
    
    tf.keras.layers.experimental.preprocessing.Rescaling(1./255.0, input_shape=INPUT_SHAPE),

    Convolution2D(filters=32, kernel_size=(3, 3), activation='relu'),
    tf.keras.layers.BatchNormalization(), 
    MaxPooling2D(),
    tf.keras.layers.Dropout(0.2), 
        
    Convolution2D(filters=64, kernel_size=(3, 3), activation='relu'),
    tf.keras.layers.BatchNormalization(), 
    MaxPooling2D(),
    tf.keras.layers.Dropout(0.2), 
    
    Convolution2D(filters=128, kernel_size=(3, 3), activation='relu'), # bigger
    tf.keras.layers.BatchNormalization(), 
    tf.keras.layers.SpatialDropout2D(0.2), 
    
    tf.keras.layers.Flatten(),
    Dense(256, activation='relu'),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Dropout(0.5), 
    
    Dense(1, activation='sigmoid')
    ])


model.compile(
    optimizer='adam',
    loss='binary_crossentropy',
    metrics=['accuracy'],
)


In [None]:
"""
Train Model
"""
EPOCHS = INITIAL_EPOCH + 1

now = "run"

callbacks = [
# Commented out to save memory (figure out save freq)
     tf.keras.callbacks.ModelCheckpoint(
         "checkpoints/" + now  +"{epoch}.h5",
         monitor='val_accuracy'),
#         mode='max',
#         save_best_only=True),
#    tf.keras.callbacks.TensorBoard('logs/', histogram_freq=1)
 ]

history = model.fit(
    train_dataset,
    epochs=EPOCHS,
    initial_epoch=INITIAL_EPOCH,
    callbacks=callbacks,
    validation_data=validation_dataset,
    verbose=1
)

INITIAL_EPOCH = EPOCHS

In [None]:
# Look at data & pick how many epochs (tensorboard is better)

from matplotlib import pyplot as plt

fig = plt.figure(figsize=plt.figaspect(0.3))

ax = fig.add_subplot(1, 2, 1)
ax.plot(history.history['loss'], label='Training loss')
ax.plot(history.history['val_loss'], label='Validation loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

ax = fig.add_subplot(1, 2, 2)
ax.plot(history.history['accuracy'], label='Training accuracy')
ax.plot(history.history['val_accuracy'], label='Validation accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

plt.show()

In [None]:
model.save('model.h5')

## Task 3 - Evaluation

For this last task, you’re given a new dataset that you must evaluate your model on. The new dataset is different from the one you’ve been training on. You will find it in the `evaluation` directory.

The new dataset consists of 30 normal cases, and 21 pneumonia cases. This time, they’re not in a jpeg format, but are txt files. Each file contains a 224x224 matrix of the pixel values of an image. As the matrix is 2D, we only have 1 color channel. If your model was trained on 3 color channels, you must convertthis monochrome 1-channel data to a 3-channel RGB similar to your training data.

To do this, you are required to write a custom Keras generator class. Onceagain, I recommend you check the docs. [You can see the Sequence base class HERE](https://keras.io/utils/).


In [None]:
"""
Create Evaluation Data Generator
"""
import tensorflow as ts
import math

class Generator(ts.keras.utils.Sequence):
    def __init__(self, batch_size=32):
        pneumonia = os.listdir("evaluation/PNEUMONIA/")
        normal = os.listdir("evaluation/NORMAL/")

        self.file_list = []
        self.class_list = []

        for f in pneumonia:
            self.file_list.append("evaluation/PNEUMONIA/" + f)
            self.class_list.append(1)
        for f in normal:
            self.file_list.append("evaluation/NORMAL/" + f)
            self.class_list.append(0)

        self.batch_size = batch_size

    def __len__(self):
        return math.ceil(len(self.file_list) / self.batch_size)

    def __getitem__(self, idx):
        batch_x = self.file_list[idx * self.batch_size:(idx + 1) *
        self.batch_size]
        batch_y = self.class_list[idx * self.batch_size:(idx + 1) *
        self.batch_size]

        return np.array([np.loadtxt(filepath) for filepath in batch_x]), np.array(batch_y)
    
evaluation_dataset = Generator()

In [None]:
"""
Evaluate model
"""
model.evaluate(evaluation_dataset)
model.evaluate(validation_dataset)

In [None]:
## Confusion Matrix and Classification  Report 

from sklearn.metrics import confusion_matrix, precision_score, recall_score, f1_score,\
cohen_kappa_score, accuracy_score, classification_report

y_pred = model.predict(evaluation_dataset)
y_pred_r = np.round(y_pred)

print('Confusion Matrix')
print(confusion_matrix(evaluation_dataset.class_list, y_pred_r))
                    

print('classification Report')
target_name = ['NORMAL', 'PNEUMONIA']
print(classification_report(evaluation_dataset.class_list, y_pred_r, target_names = target_name))