<pre>Created by David C. Heson for the University of Alabama at Birmingham Physics REU 2023.</pre>

Website: https://cecas.clemson.edu/~ahoover/stare/
Diagnosis: 
im0057	10  7		Hypertensive Retinopathy??? OR Background Diabetic Retinopathy

![Data Example](data_ex.jpg)

Problem Statement: Say there is an issue with your scanner, and when your data should look like the image above, it just ends up looking like this:
![Distorted data :( ](distorted.jpg)

Alternative problem statement:
![Distorted data 2 :( ](distorted_2.jpg)

[Original U-Net Paper](https://arxiv.org/pdf/1505.04597.pdf)
![Original U-Net model as per the paper](og_unet_model.png)

A lot of Machine Learning (particularly in Python) is done by using pre-built models and simply adjusting them. Most tutorials that actually go through the layers are <i> very strongly </i> inspired by the tutorial on the official Keras website ([here](https://keras.io/examples/vision/oxford_pets_image_segmentation/)), which I will also be closely following regarding the construction of the model itself.

U-Net Models were designed for <i> image segmentation </i>, which means they can identify the margins of different entities within an image. Take the below example of a cat and the mask created for a cat by a U-Net model.

![Cat (credit to Keras tutorial)](cat.jpeg)
![Cat + mask (credit to Keras tutorial)](cat_mask.png)

95% of Machine Learning work, especially when it comes to image processing, is data preparation. We have to create data which contains relevant <i> information </i> for our task at hand, and also make it such that the model can parse through it efficiently and cleanly. We can also try to enhance our data set by extrapolating from existing data (within ethical and logical bounds).

First of all, we need to parse through the data set. We know the two possible diagnoses on the image (code 7 and 10), and so we can select only images from the data set which have 7, 10, or both as possible diagnoses. 

The website provides a full list of diagnoses, which we can download as a .txt. 

In [None]:
### import diagnoses codes as a .txt

import requests

def download_website(url, output_file):
    response = requests.get(url)
    
    if response.status_code == 200:
        with open(output_file, 'w') as file:
            file.write(response.text)
    else:
        print("Failed to download website.")

url = 'https://cecas.clemson.edu/~ahoover/stare/diagnoses/all-mg-codes.txt'
output_file = 'diagnoses.txt'

download_website(url, output_file)

with open('diagnoses.txt', 'r') as file:
    contents = file.read()
    print(contents)

In [None]:
### parse through downloaded .txt to find the images that correspond to our diagnoses

selected_images = []

with open('diagnoses.txt', 'r') as file:
    for line in file:
        image_info = line.split('\t')
        image_name = image_info[0]
        integers = image_info[1].split()

        if '10' in integers or '7' in integers:
            selected_images.append(image_name + '.ppm')

print(selected_images)

In [None]:
### I will be a bit inneficient here. I will now just copy over these images to a new folder.
import os
import shutil

def copy_files(file_list, source_folder, destination_folder):
    for file_name in file_list:
        source_path = os.path.join(source_folder, file_name)
        destination_path = os.path.join(destination_folder, file_name)
        
        if os.path.exists(source_path):
            shutil.copy(source_path, destination_path)
        else:
            print(f"File '{file_name}' does not exist in the source folder.")

copy_files(selected_images, 'all-images', 'selected-images')

The original U-Net Model does image segmentation, which means that it identifies regions of space which fit within specific shapes. However, with some altering of the layers the output can be designated as the image itself. 

[This](https://github.com/g2archie/UNet-MRI-Reconstruction) model does just so, created for MRI image reconstruction. We will simply use the model they created and adjust it to fit to our data.

![Modified UNet](modified_unet.png)

Now, for the model....

In [6]:
import tensorflow as tf

from tensorflow.python.keras import backend as K

### layers

def _bernoulli(shape, mean):
    return tf.nn.relu(tf.sign(mean - tf.random.uniform(shape, minval=0, maxval=1, dtype=tf.float32)))


class DropBlock2D(tf.keras.layers.Layer):
    def __init__(self, keep_prob, block_size, scale=True, **kwargs):
        super(DropBlock2D, self).__init__(**kwargs)
        self.keep_prob = float(keep_prob) if isinstance(keep_prob, int) else keep_prob
        self.block_size = int(block_size)
        self.scale = tf.constant(scale, dtype=tf.bool) if isinstance(scale, bool) else scale

    def compute_output_shape(self, input_shape):
        return input_shape

    def build(self, input_shape):
        assert len(input_shape) == 4
        _, self.h, self.w, self.channel = input_shape.as_list()
        # pad the mask
        p1 = (self.block_size - 1) // 2
        p0 = (self.block_size - 1) - p1
        self.padding = [[0, 0], [p0, p1], [p0, p1], [0, 0]]
        self.set_keep_prob()
        super(DropBlock2D, self).build(input_shape)

    def call(self, inputs, training=None, **kwargs):
        def drop():
            mask = self._create_mask(tf.shape(inputs))
            output = inputs * mask
            output = tf.cond(self.scale,
                             true_fn=lambda: output * tf.cast(tf.size(mask), tf.float32) / tf.reduce_sum(mask),
                             false_fn=lambda: output)
            return output

        if training is None:
            training = K.learning_phase()
        output = tf.cond(tf.logical_or(tf.logical_not(training), tf.equal(self.keep_prob, 1.0)),
                         true_fn=lambda: inputs,
                         false_fn=drop)
        return output

    def set_keep_prob(self, keep_prob=None):
        """This method only supports Eager Execution"""
        if keep_prob is not None:
            self.keep_prob = keep_prob
        w, h = tf.cast(self.w, tf.float32), tf.cast(self.h, tf.float32)
        self.gamma = (1. - self.keep_prob) * (w * h) / (self.block_size ** 2) / \
                     ((w - self.block_size + 1) * (h - self.block_size + 1))

    def _create_mask(self, input_shape):
        sampling_mask_shape = tf.stack([input_shape[0],
                                       self.h - self.block_size + 1,
                                       self.w - self.block_size + 1,
                                       self.channel])
        mask = _bernoulli(sampling_mask_shape, self.gamma)
        mask = tf.pad(mask, self.padding)
        mask = tf.nn.max_pool(mask, [1, self.block_size, self.block_size, 1], [1, 1, 1, 1], 'SAME')
        mask = 1 - mask
        return mask


class DropBlock3D(tf.keras.layers.Layer):
    def __init__(self, keep_prob, block_size, scale=True, **kwargs):
        super(DropBlock3D, self).__init__(**kwargs)
        self.keep_prob = float(keep_prob) if isinstance(keep_prob, int) else keep_prob
        self.block_size = int(block_size)
        self.scale = tf.constant(scale, dtype=tf.bool) if isinstance(scale, bool) else scale

    def compute_output_shape(self, input_shape):
        return input_shape

    def build(self, input_shape):
        assert len(input_shape) == 5
        _, self.d, self.h, self.w, self.channel = input_shape.as_list()
        # pad the mask
        p1 = (self.block_size - 1) // 2
        p0= (self.block_size - 1) - p1
        self.padding = [[0, 0], [p0, p1], [p0, p1], [p0, p1], [0, 0]]
        self.set_keep_prob()
        super(DropBlock3D, self).build(input_shape)

    def call(self, inputs, training=None, **kwargs):
        def drop():
            mask = self._create_mask(tf.shape(inputs))
            output = inputs * mask
            output = tf.cond(self.scale,
                             true_fn=lambda: output * tf.cast(tf.size(mask), tf.float32) / tf.reduce_sum(mask),
                             false_fn=lambda: output)
            return output

        if training is None:
            training = K.learning_phase()
        output = tf.cond(tf.logical_or(tf.logical_not(training), tf.equal(self.keep_prob, 1.0)),
                         true_fn=lambda: inputs,
                         false_fn=drop)
        return output

    def set_keep_prob(self, keep_prob=None):
        """This method only supports Eager Execution"""
        if keep_prob is not None:
            self.keep_prob = keep_prob
        d, w, h = tf.cast(self.d, tf.float32), tf.cast(self.w, tf.float32), tf.cast(self.h, tf.float32)
        self.gamma = ((1. - self.keep_prob) * (d * w * h) / (self.block_size ** 3) /
                      ((d - self.block_size + 1) * (w - self.block_size + 1) * (h - self.block_size + 1)))

    def _create_mask(self, input_shape):
        sampling_mask_shape = tf.stack([input_shape[0],
                                        self.d - self.block_size + 1,
                                        self.h - self.block_size + 1,
                                        self.w - self.block_size + 1,
                                        self.channel])
        mask = _bernoulli(sampling_mask_shape, self.gamma)
        mask = tf.pad(mask, self.padding)
        mask = tf.nn.max_pool3d(mask, [1, self.block_size, self.block_size, self.block_size, 1], [1, 1, 1, 1, 1], 'SAME')
        mask = 1 - mask
        return mask

class DropBlock2D(tf.keras.layers.Layer):
    def __init__(self, keep_prob, block_size, scale=True, **kwargs):
        super(DropBlock2D, self).__init__(**kwargs)
        self.keep_prob = float(keep_prob) if isinstance(keep_prob, int) else keep_prob
        self.block_size = int(block_size)
        self.scale = tf.constant(scale, dtype=tf.bool) if isinstance(scale, bool) else scale

    def compute_output_shape(self, input_shape):
        return input_shape

    def build(self, input_shape):
        assert len(input_shape) == 4
        _, self.h, self.w, self.channel = input_shape.as_list()
        # pad the mask
        p1 = (self.block_size - 1) // 2
        p0 = (self.block_size - 1) - p1
        self.padding = [[0, 0], [p0, p1], [p0, p1], [0, 0]]
        self.set_keep_prob()
        super(DropBlock2D, self).build(input_shape)

    def call(self, inputs, training=None, **kwargs):
        def drop():
            mask = self._create_mask(tf.shape(inputs))
            output = inputs * mask
            output = tf.cond(self.scale,
                             true_fn=lambda: output * tf.cast(tf.size(mask), tf.float32) / tf.reduce_sum(mask),
                             false_fn=lambda: output)
            return output

        if training is None:
            training = K.learning_phase()
        output = tf.cond(tf.logical_or(tf.logical_not(training), tf.equal(self.keep_prob, 1.0)),
                         true_fn=lambda: inputs,
                         false_fn=drop)
        return output

    def set_keep_prob(self, keep_prob=None):
        """This method only supports Eager Execution"""
        if keep_prob is not None:
            self.keep_prob = keep_prob
        w, h = tf.cast(self.w, tf.float32), tf.cast(self.h, tf.float32)
        self.gamma = (1. - self.keep_prob) * (w * h) / (self.block_size ** 2) / \
                     ((w - self.block_size + 1) * (h - self.block_size + 1))

    def _create_mask(self, input_shape):
        sampling_mask_shape = tf.stack([input_shape[0],
                                       self.h - self.block_size + 1,
                                       self.w - self.block_size + 1,
                                       self.channel])
        mask = _bernoulli(sampling_mask_shape, self.gamma)
        mask = tf.pad(mask, self.padding)
        mask = tf.nn.max_pool(mask, [1, self.block_size, self.block_size, 1], [1, 1, 1, 1], 'SAME')
        mask = 1 - mask
        return mask


class DropBlock3D(tf.keras.layers.Layer):
    def __init__(self, keep_prob, block_size, scale=True, **kwargs):
        super(DropBlock3D, self).__init__(**kwargs)
        self.keep_prob = float(keep_prob) if isinstance(keep_prob, int) else keep_prob
        self.block_size = int(block_size)
        self.scale = tf.constant(scale, dtype=tf.bool) if isinstance(scale, bool) else scale

    def compute_output_shape(self, input_shape):
        return input_shape

    def build(self, input_shape):
        assert len(input_shape) == 5
        _, self.d, self.h, self.w, self.channel = input_shape.as_list()
        # pad the mask
        p1 = (self.block_size - 1) // 2
        p0= (self.block_size - 1) - p1
        self.padding = [[0, 0], [p0, p1], [p0, p1], [p0, p1], [0, 0]]
        self.set_keep_prob()
        super(DropBlock3D, self).build(input_shape)

    def call(self, inputs, training=None, **kwargs):
        def drop():
            mask = self._create_mask(tf.shape(inputs))
            output = inputs * mask
            output = tf.cond(self.scale,
                             true_fn=lambda: output * tf.cast(tf.size(mask), tf.float32) / tf.reduce_sum(mask),
                             false_fn=lambda: output)
            return output

        if training is None:
            training = K.learning_phase()
        output = tf.cond(tf.logical_or(tf.logical_not(training), tf.equal(self.keep_prob, 1.0)),
                         true_fn=lambda: inputs,
                         false_fn=drop)
        return output

    def set_keep_prob(self, keep_prob=None):
        """This method only supports Eager Execution"""
        if keep_prob is not None:
            self.keep_prob = keep_prob
        d, w, h = tf.cast(self.d, tf.float32), tf.cast(self.w, tf.float32), tf.cast(self.h, tf.float32)
        self.gamma = ((1. - self.keep_prob) * (d * w * h) / (self.block_size ** 3) /
                      ((d - self.block_size + 1) * (w - self.block_size + 1) * (h - self.block_size + 1)))

    def _create_mask(self, input_shape):
        sampling_mask_shape = tf.stack([input_shape[0],
                                        self.d - self.block_size + 1,
                                        self.h - self.block_size + 1,
                                        self.w - self.block_size + 1,
                                        self.channel])
        mask = _bernoulli(sampling_mask_shape, self.gamma)
        mask = tf.pad(mask, self.padding)
        mask = tf.nn.max_pool3d(mask, [1, self.block_size, self.block_size, self.block_size, 1], [1, 1, 1, 1, 1], 'SAME')
        mask = 1 - mask
        return mask    
    
### model

class UNet2D2D(tf.keras.Model):

    def __init__(self, regularization=None, regularization_parameters=None):
        super().__init__()
        self.depth = 5
        self.regularization = regularization
        self.regularization_parameters = regularization_parameters
        self.kernel_regularizer = None
        if regularization is not None:
            if regularization == 'l2':
                self.kernel_regularizer = tf.keras.regularizers.l2(*regularization_parameters)
            elif regularization == 'l1':
                self.kernel_regularizer = tf.keras.regularizers.l1(*regularization_parameters)
            elif regularization == 'l1_l2':
                self.kernel_regularizer = tf.keras.regularizers.l1_l2(*regularization_parameters)

        self.conv3d_1_1 = tf.keras.layers.Conv3D(filters=1, kernel_size=3, strides=1, padding='same', name='conv3d_1_1',
                                                 kernel_regularizer=self.kernel_regularizer)

    def _add_regularization_layer(self, input_layer, name_suffix, input_type='2d', activation='relu'):

        regularization_layer = None

        if self.regularization == 'batch_norm':
            layer_name = "Batch_Norm_" + name_suffix
            if hasattr(self, layer_name):
                batch_norm_layer = getattr(self, layer_name)
            else:
                batch_norm_layer = tf.keras.layers.BatchNormalization(-1, name=layer_name)
                setattr(self, layer_name, batch_norm_layer)
            regularization_layer = batch_norm_layer(input_layer)

        elif self.regularization == 'instance_norm':
            layer_name = "Instance_Norm_" + name_suffix
            if hasattr(self, layer_name):
                instance_norm_layer = getattr(self, layer_name)
            else:
                instance_norm_layer = InstanceNormalization(-1, name=layer_name)
                setattr(self, layer_name, instance_norm_layer)
            regularization_layer = instance_norm_layer(input_layer)

        elif self.regularization == 'dropout':
            layer_name = "Dropout_" + name_suffix
            if hasattr(self, layer_name):
                dropout_layer = getattr(self, layer_name)
            else:
                dropout_layer = tf.keras.layers.Dropout(*self.regularization_parameters, name=layer_name)
                setattr(self, layer_name, dropout_layer)
            regularization_layer = dropout_layer(input_layer)

        elif self.regularization == 'dropblock':
            if input_type == '1d':
                return input_layer
            elif input_type == '2d':
                layer_name = "DropBlock_" + name_suffix
                if hasattr(self, layer_name):
                    dropblock_layer = getattr(self, layer_name)
                else:
                    dropblock_layer = DropBlock2D(*self.regularization_parameters, name=layer_name)
                    setattr(self, layer_name, dropblock_layer)
                regularization_layer = dropblock_layer(input_layer)

            elif input_type == '3d':
                layer_name = "DropBlock_" + name_suffix
                if hasattr(self, layer_name):
                    dropblock_layer = getattr(self, layer_name)
                else:
                    dropblock_layer = DropBlock3D(*self.regularization_parameters, name=layer_name)
                    setattr(self, layer_name, dropblock_layer)
                regularization_layer = dropblock_layer(input_layer)

        if regularization_layer is not None:
            output = tf.keras.layers.Activation(activation=activation)(regularization_layer)
        else:
            output = tf.keras.layers.Activation(activation=activation)(input_layer)
        return output

    def _get_convolution_block(self, input_layer, filters, kernel_size=3, strides=1, padding='same',
                               name_prefix='l_', activation='relu'):

        in_b, in_w, in_h, in_t, in_c = input_layer.get_shape().as_list()
        permute_layer_name_1 = name_prefix + "Permute_{}_1".format(filters)
        if hasattr(self, permute_layer_name_1):
            permute_layer_1 = getattr(self, permute_layer_name_1)
        else:
            permute_layer_1 = tf.keras.layers.Permute((2, 1, 3, 4), name=permute_layer_name_1)
            setattr(self, permute_layer_name_1, permute_layer_1)

        permute_layer_1 = permute_layer_1(input_layer)

        reshape_layer_1 = tf.reshape(permute_layer_1, shape=(-1,
                                                             in_w, in_t,
                                                             in_c))

        conv2d_layer_name_1 = name_prefix + "Conv2D_{}_1".format(filters)
        if hasattr(self, conv2d_layer_name_1):
            conv2d_1 = getattr(self, conv2d_layer_name_1)
        else:
            conv2d_1 = tf.keras.layers.Conv2D(filters=filters, kernel_size=kernel_size, strides=strides,
                                              padding=padding,name=conv2d_layer_name_1,
                                              kernel_regularizer=self.kernel_regularizer, data_format='channels_last')
            setattr(self, conv2d_layer_name_1, conv2d_1)

        conv2d_1 = conv2d_1(reshape_layer_1)
        conv2d_1 = self._add_regularization_layer(conv2d_1, name_suffix=conv2d_layer_name_1, activation=activation)

        reshape_layer_2 = tf.reshape(conv2d_1, shape=(-1, in_h,
                                                      in_w, in_t, filters))

        permute_layer_name_2 = name_prefix + "Permute_{}_2".format(filters)
        if hasattr(self, permute_layer_name_2):
            permute_layer_2 = getattr(self, permute_layer_name_2)
        else:
            permute_layer_2 = tf.keras.layers.Permute((2, 1, 3, 4), name=permute_layer_name_2)
            setattr(self, permute_layer_name_2, permute_layer_2)
        permute_layer_2 = permute_layer_2(reshape_layer_2)

        reshape_layer_3 = tf.keras.backend.reshape(permute_layer_2, shape=(-1, in_h,
                                                                           in_t, filters))
        conv2d_layer_name_2 = name_prefix + "Conv2D_{}_2".format(filters)
        if hasattr(self, conv2d_layer_name_2):
            conv2d_2 = getattr(self, conv2d_layer_name_2)
        else:
            conv2d_2 = tf.keras.layers.Conv2D(filters=filters, kernel_size=kernel_size, strides=strides, padding=padding,
                                              name=conv2d_layer_name_2,
                                              kernel_regularizer=self.kernel_regularizer, data_format='channels_last')
            setattr(self, conv2d_layer_name_2, conv2d_2)

        conv2d_2 = conv2d_2(reshape_layer_3)
        conv2d_2 = self._add_regularization_layer(conv2d_2, name_suffix=conv2d_layer_name_2, activation=activation)

        reshape_layer_4 = tf.keras.backend.reshape(conv2d_2, shape=(-1, in_w, in_h, in_t, filters))

        return reshape_layer_4

    def _get_convolution_transpose_layer(self, input_layer, filters, kernel_size=3, strides=(2, 2, 1), padding='same',
                                         name_prefix='r_', activation='relu'):

        conv3d_transpose_layer_name = name_prefix + "UpConv3D_{}".format(filters)
        if hasattr(self, conv3d_transpose_layer_name):
            conv3d_transpose = getattr(self, conv3d_transpose_layer_name)
        else:
            conv3d_transpose = tf.keras.layers.Convolution3DTranspose(filters=filters, kernel_size=kernel_size,
                                                                      strides=strides, padding=padding,
                                                                      name=conv3d_transpose_layer_name,
                                                                      kernel_regularizer=self.kernel_regularizer)

            setattr(self, conv3d_transpose_layer_name, conv3d_transpose)
        conv3d_transpose = conv3d_transpose(input_layer)
        conv3d_transpose = self._add_regularization_layer(conv3d_transpose, name_suffix=conv3d_transpose_layer_name,
                                                          input_type='3d', activation=activation)
        return conv3d_transpose

    def _get_max_pool_3d_layer(self, filters, pool_size=(2, 2, 1), strides=(2, 2, 1), padding='same',
                               name_prefix='l_'):

        maxpool_3d_layer_name = name_prefix + "MaxPool3D_{}".format(filters)
        if hasattr(self, maxpool_3d_layer_name):
            maxpool_3d = getattr(self, maxpool_3d_layer_name)
        else:
            maxpool_3d = tf.keras.layers.MaxPooling3D(pool_size=pool_size, strides=strides, padding=padding,
                                                      name=maxpool_3d_layer_name)
            setattr(self, maxpool_3d_layer_name, maxpool_3d)
        return maxpool_3d

    def call(self, inputs):
        inputs = tf.keras.backend.expand_dims(inputs, axis=-1)
        current_layer = inputs
        level = []

        for depth in range(self.depth):
            filters = 32 * 2 ** depth
            first_layer = self._get_convolution_block(input_layer=current_layer, filters=filters)
            if depth < self.depth - 1:
                current_layer = self._get_max_pool_3d_layer(filters=filters)(first_layer)
                level.append([first_layer, current_layer])
            else:
                current_layer = first_layer
                level.append([first_layer])

        for depth in range(self.depth - 2, -1, -1):
            filters = 32 * 2 ** depth
            up_convolution_layer = self._get_convolution_transpose_layer(input_layer=current_layer, filters=filters,
                                                                         kernel_size=2)
            concat_layer = tf.keras.layers.concatenate([level[depth][0], up_convolution_layer], axis=-1)
            current_layer = self._get_convolution_block(input_layer=concat_layer, filters=filters, name_prefix='r_')

        conv3d_1_1 = self.conv3d_1_1(current_layer)
        conv3d_1_1 = self._add_regularization_layer(conv3d_1_1, name_suffix='conv3d_1_1', input_type='3d',
                                                    activation='relu')

        sum = tf.keras.layers.add([conv3d_1_1, inputs])
        update = tf.keras.activations.relu(sum)
        output = tf.keras.backend.squeeze(update, -1)
        return output


In [3]:
from PIL import Image

image_path = "data_ex.ppm"  
image = Image.open(image_path)

print(image.size)

(700, 605)


In [8]:
### code here won't work

for training_path in training_paths:
    print(training_path)
    for task_name, path in get_subdirectories(training_path):
        print(task_name)
        for dirpath, dirnames, filenames in os.walk(path):
            for filename in [f for f in filenames if f.endswith(".h5")]:
                training_result = os.path.join(dirpath, filename)
                metric = calculate_metrics(training_result)
                result.append((task_name, metric))
                print()