# Things to install


## Colab


In [None]:
!apt update & & apt install - y openslide-tools
!pip install openslide-python


In [1]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


## Locale


In [None]:
!pip install openslide-python
!pip install opencv-python
!pip install imgaug
!pip install scikit-learn


In [None]:
!pip install scikit-learn


In [None]:
# The path can also be read from a config file, etc.
import os
OPENSLIDE_PATH = r'C:\Users\sofia\openslide-win64-20230414\openslide-win64-20230414\bin'

if hasattr(os, 'add_dll_directory'):
    # Python >= 3.8 on Windows
    with os.add_dll_directory(OPENSLIDE_PATH):
        import openslide
else:
    import openslide


# Code


# Patches extrapolation from wsi


## - Import & constants


In [None]:
import concurrent.futures
import glob
import numpy as np
from thread import process_svs_file
import threading
import openslide
import cv2
from matplotlib import pyplot as plt
import xml.etree.ElementTree as ET

path_to_images = "../slides/"
path_to_annotations = "../annotations/"
el_width = 2000
el_height = 2000
output_width = 1000
output_height = 1000


## - Function


In [None]:
def get_annotatios(file_path):
    # Parsa il file XML delle annotazioni
    tree = ET.parse(file_path)
    root = tree.getroot()
    # Ottieni tutte le annotazioni dal file XML
    annotations = []
    for annotation in root.iter('Annotation'):
        name = annotation.get('Name')
        coordinates = []
        for coordinate in annotation.iter('Coordinate'):
            x = float(coordinate.get('X'))
            y = float(coordinate.get('Y'))
            coordinates.append((x, y))
        annotations.append({'name': name, 'coordinates': coordinates})
    return annotations


In [None]:
def is_mostly_white(image, threshold_w=0.85, threshold_p=0.98):
    # Converti l'immagine in scala di grigi
    gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

    # Calcola la soglia per considerare i pixel bianchi
    pixel_threshold = int(threshold_w * 255)

    # Conta i pixel bianchi nell'immagine
    white_pixels = np.sum(gray_image >= pixel_threshold)

    # Calcola la percentuale di pixel bianchi rispetto alla dimensione totale dell'immagine
    white_percentage = white_pixels / \
        (gray_image.shape[0] * gray_image.shape[1])

    # Verifica se la percentuale di pixel bianchi supera la soglia
    if white_percentage >= threshold_p:
        return True, white_percentage
    else:
        return False, white_percentage


In [None]:
def get_labels(labels, annotations):
    for annotation in annotations:
        polygon = np.array([annotation['coordinates']], dtype=np.int32)
        cv2.fillPoly(labels, polygon, 1)


In [None]:
def plt_image(image, labes):
    fig, axs = plt.subplots(1, 2)
    # Primo subplot: labels
    axs[0].imshow(labes)
    axs[0].axis('off')
    # Secondo subplot: immagine
    axs[1].imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
    axs[1].axis('off')
    # Mostra i subplot affiancati
    plt.show()


In [None]:
def extrapolate_patches(wsi, annotation, el_width, el_height, output_width, output_height):
    # Ottieni le dimensioni dell'immagine
    w, h = wsi.dimensions
    label_image = np.zeros((h, w), dtype=np.uint8)
    annotations = get_annotatios(annotation)
    get_labels(label_image, annotations)

    # Calcola il numero di righe e colonne necessarie per suddividere l'immagine
    num_rows = h // el_height
    num_cols = w // el_width

    # Crea un'immagine di output con le stesse dimensioni dell'immagine svs

    dataset = []
    labels = []

    thread_name = threading.current_thread().name
    file = open("../log/thread_" + thread_name + ".txt", "a")
    file.write("Sto per leggere il file wsi\n")

    wsi = np.array(wsi.read_region((0, 0), 0, (w, h)))

    file.write("File letto\n")
    for row in range(num_rows):
        for col in range(num_cols):
            # for row in range(3, 5):
            #    for col in range(58, 60):
            # Calcola le coordinate di inizio e fine per l'immagine corrente
            x = col * el_width
            y = row * el_height
            x_end = x + el_width
            y_end = y + el_height

            # Estrai l'immagine corrente
            region = wsi[y: y_end, x: x_end]
            image = cv2.cvtColor(region, cv2.COLOR_RGBA2BGR)

            is_white, p = is_mostly_white(image)
            if not is_white:
                r_image = cv2.resize(
                    image, (output_width, output_height), interpolation=cv2.INTER_CUBIC)
                r_label_image = cv2.resize(
                    label_image[y:y_end, x: x_end], (output_width, output_height), interpolation=cv2.INTER_CUBIC)

                dataset.append(r_image)
                labels.append(r_label_image)
                if not ((col == num_cols-1) or (row == num_rows-1)):
                    x_h = x + el_width // 2
                    x_v = x
                    x_d = x + el_width // 2
                    y_h = y
                    y_v = y + el_height // 2
                    y_d = y + el_height // 2
                    region_h = wsi[y_h: y_h + el_height,
                                   x_h: x_h + el_width]
                    region_v = wsi[y_v: y_v + el_height,
                                   x_v: x_v + el_width]
                    region_d = wsi[y_d: y_d + el_height,
                                   x_d: x_d + el_width]
                    image_h = cv2.cvtColor(
                        np.array(region_h), cv2.COLOR_RGBA2BGR)
                    image_v = cv2.cvtColor(
                        np.array(region_v), cv2.COLOR_RGBA2BGR)
                    image_d = cv2.cvtColor(
                        np.array(region_d), cv2.COLOR_RGBA2BGR)
                    is_white_h, _ = is_mostly_white(image_h)
                    is_white_v, _ = is_mostly_white(image_v)
                    is_white_d, _ = is_mostly_white(image_d)
                    if not is_white_h:
                        r_image = cv2.resize(
                            image_h, (output_width, output_height), interpolation=cv2.INTER_CUBIC)
                        r_label_image = cv2.resize(
                            label_image[y_h: y_h+el_height, x_h: x_h+el_width], (output_width, output_height), interpolation=cv2.INTER_CUBIC)

                        dataset.append(r_image)
                        labels.append(r_label_image)

                    if not is_white_v:
                        r_image = cv2.resize(
                            image_v, (output_width, output_height), interpolation=cv2.INTER_CUBIC)
                        r_label_image = cv2.resize(
                            label_image[y_v: y_v+el_height, x_v: x_v+el_width], (output_width, output_height), interpolation=cv2.INTER_CUBIC)

                        dataset.append(r_image)
                        labels.append(r_label_image)

                    if not is_white_d:
                        r_image = cv2.resize(
                            image_d, (output_width, output_height), interpolation=cv2.INTER_CUBIC)
                        r_label_image = cv2.resize(
                            label_image[y_d: y_d+el_height, x_d: x_d+el_width], (output_width, output_height), interpolation=cv2.INTER_CUBIC)

                        dataset.append(r_image)
                        labels.append(r_label_image)

    file.write("Wsi elaborato\nDataset di dimensione:" +
               str(np.array(dataset).shape) + "\nLabels di dimensione:" + str(np.array(labels).shape))
    file.close()
    return dataset, labels


In [None]:
def process_svs_file(svs_file, path_to_annotations, path_to_images, el_width, el_height, output_width, output_height):
    thread_name = threading.current_thread().name
    file = open("../log/thread_" + thread_name + ".txt", "x")
    file.write("Sono il thread" + thread_name + "\n")
    file.write("Sto elaborando il file " +
               svs_file[len(path_to_images):-4] + "\n")
    file.close()
    # Ottieni il percorso del file .xml corrispondente
    annotation = path_to_annotations + \
        svs_file[len(path_to_images):-4] + ".xml"
    # Carica l'immagine svs
    wsi = openslide.OpenSlide(svs_file)
    d, l = extrapolate_patches(
        wsi, annotation, el_width, el_height, output_width, output_height)
    np.save('../slides/' +
            svs_file[len(path_to_images):-4] + '.npy', np.array(d))
    np.save('../annotations/' +
            svs_file[len(path_to_images):-4] + '_label.npy', np.array(l))
    return d, l


## - Code


In [None]:
dataset = []
labels = []

file = open("../log/job.txt", "x")
file.write("Il task è iniziato\n")
file.close()

# Ottieni la lista dei file .svs nella cartella slides
svs_files = glob.glob(path_to_images + "*.svs")

# Creazione di un ThreadPoolExecutor con un numero di thread desiderato
num_threads = 9  # Numero di thread da utilizzare
executor = concurrent.futures.ThreadPoolExecutor(max_workers=num_threads)


In [None]:
# Lista per salvare i future restituiti dalle chiamate asincrone
futures = []
file = open("../log/job.txt", "a")
file.write("Lancio i thread\n")

# Esecuzione della funzione extrapolate_patches in parallelo per ogni svs_file
for svs_file in svs_files:
    future = executor.submit(process_svs_file, svs_file, svs_file, path_to_annotations,
                             path_to_images, el_width, el_height, output_width, output_height)
    futures.append(future)

file.write("Aspetto la fine dei thread\n")
# Attendere il completamento di tutte le chiamate asincrone
concurrent.futures.wait(futures)

file.write("I thread hanno finito, concateno i risultati\n")
# Ottenere i risultati dai future
dataset = []
labels = []
for future in futures:
    d, l = future.result()
    dataset.extend(d)
    labels.extend(l)


In [None]:
dataset = np.array(dataset)
labels = np.array(labels)

np.save('../slides/dataset.npy', dataset)
np.save('../annotations/labels.npy', labels)

file.write("Risultati salvati\n")

file.close()


# Preprocessing data and data augmentation


## - Import and constants


In [2]:
import imgaug.augmenters as iaa
import tensorflow as tf
from imgaug.augmentables.segmaps import SegmentationMapsOnImage
from sklearn.model_selection import train_test_split
import numpy as np

path_to_dataset = "/content/drive/MyDrive/Polito/MLinAP/RECHERCHE-015.npy"
path_to_labels = "/content/drive/MyDrive/Polito/MLinAP/RECHERCHE-015_label.npy"


## - Function


In [3]:
def data_augment():
    return iaa.Sequential([
        iaa.Dropout((0, 0.05)),  # Remove random pixel
        iaa.Affine(rotate=(-30, 30)),  # Rotate between -30 and 30 degreed
        iaa.Fliplr(0.5),  # Flip with 0.5 probability
        iaa.Crop(percent=(0, 0.2), keep_size=True),  # Random crop
        # Add -50 to 50 to the brightness-related channels of each image
        iaa.WithBrightnessChannels(iaa.Add((-50, 50))),
        # Change images to grayscale and overlay them with the original image by varying strengths, effectively removing 0 to 50% of the color
        iaa.Grayscale(alpha=(0.0, 0.5)),
        # Add random value to each pixel
        iaa.GammaContrast((0.5, 2.0), per_channel=True),
        # Local distortions of images by moving points around
        iaa.PiecewiseAffine(scale=(0.01, 0.1)),
    ], random_order=True)


In [4]:
def process_data(image, label):
    return tf.cast(image, tf.float32)/255, tf.one_hot(label, 2, name="label", axis=-1)


In [5]:
def data_aug_impl(dataset, image_train, label_train):
    da = data_augment()
    segmented_label_train = [SegmentationMapsOnImage(
        label, shape=dataset[1].shape) for label in label_train]
    image_train_copy = image_train.copy()
    for _ in range(1):
        augmented_images, augmented_labels = da(
            images=image_train_copy, segmentation_maps=segmented_label_train)
        image_train = np.append(image_train, augmented_images, axis=0)
        label_train = np.append(label_train, np.array(
            [label.get_arr() for label in augmented_labels]), axis=0)

    return image_train, label_train


In [6]:
def generate_train_data_tensor(image_train, label_train):
    train_data = tf.data.Dataset.from_tensor_slices((image_train, label_train))
    train_data = train_data.map(
        process_data, num_parallel_calls=tf.data.AUTOTUNE)
    train_data = train_data.cache()
    train_data = train_data.shuffle(100)
    train_data = train_data.batch(128)
    train_data = train_data.prefetch(tf.data.AUTOTUNE)
    return train_data


## - Code


In [7]:
print("Loading dataset and labels")

dataset = np.load(path_to_dataset)[0:2]
labels = np.load(path_to_labels)[0:2]

print(
    f"Dataset and labels loaded\nDataset shape {dataset.shape} \nLabels shape {labels.shape}")


Loading dataset and labels
Dataset and labels loaded
Dataset shape (2, 512, 512, 3) 
Labels shape (2, 512, 512)


In [8]:
image_train, image_test, label_train, label_test = train_test_split(
    dataset, labels, test_size=0.25, random_state=42)

print("Dataset and labels splitted in train and test set\n" +
      f"image_train shape {image_train.shape} - label_train shape {label_train.shape}" +
      f"image_test shape {image_test.shape} - image_test shape {label_test.shape}")


Dataset and labels splitted in train and test set
image_train shape (1, 512, 512, 3) - label_train shape (1, 512, 512)image_test shape (1, 512, 512, 3) - image_test shape (1, 512, 512)


In [9]:
image_train, label_train = data_aug_impl(dataset, image_train, label_train)

print("Applied data agumentation to train set\n" +
      f"image_train augmented shape {image_train.shape} - label_train augmented shape {label_train.shape}")


Applied data agumentation to train set
image_train augmented shape (2, 512, 512, 3) - label_train augmented shape (2, 512, 512)


In [10]:
train_data = generate_train_data_tensor(image_train, label_train)

print("Preprocessed and created tensor dataset")


Preprocessed and created tensor dataset


# SegNet && Unet


## - Import and constants


In [11]:
import tensorflow as tf
from keras.models import Model
from keras.layers import BatchNormalization
from keras.layers.convolutional import Conv2D
from tensorflow.keras.layers import Activation
from keras import backend as K
from keras.layers import Layer
from keras.layers.convolutional import Conv2D, BatchNormalization
import numpy as np
from sklearn.model_selection import train_test_split, GridSearchCV
import json

NUM_CLASSES = 2
INPUT_SHAPE = (512, 512, 3)
IMAGE_SIZE = (512, 512)


## - Class


## segnet

In [12]:
from keras import backend as K
from keras.layers import Layer


class MaxPoolingWithArgmax2D(Layer):
    def __init__(self, pool_size=(2, 2), strides=(2, 2), padding="same", **kwargs):
        super(MaxPoolingWithArgmax2D, self).__init__(**kwargs)
        self.padding = padding
        self.pool_size = pool_size
        self.strides = strides

    def call(self, inputs, **kwargs):
        padding = self.padding
        pool_size = self.pool_size
        strides = self.strides
        if K.backend() == "tensorflow":
            ksize = [1, pool_size[0], pool_size[1], 1]
            padding = padding.upper()
            strides = [1, strides[0], strides[1], 1]
            output, argmax = K.tf.nn.max_pool_with_argmax(
                inputs, ksize=ksize, strides=strides, padding=padding
            )
        else:
            errmsg = "{} backend is not supported for layer {}".format(
                K.backend(), type(self).__name__
            )
            raise NotImplementedError(errmsg)
        argmax = K.cast(argmax, K.floatx())
        return [output, argmax]

    def compute_output_shape(self, input_shape):
        ratio = (1, 2, 2, 1)
        output_shape = [
            dim // ratio[idx] if dim is not None else None
            for idx, dim in enumerate(input_shape)
        ]
        output_shape = tuple(output_shape)
        return [output_shape, output_shape]

    def compute_mask(self, inputs, mask=None):
        return 2 * [None]


class MaxUnpooling2D(Layer):
    def __init__(self, size=(2, 2), **kwargs):
        super(MaxUnpooling2D, self).__init__(**kwargs)
        self.size = size

    def call(self, inputs, output_shape=None):
        updates, mask = inputs[0], inputs[1]
        mask = tf.cast(mask, "int32")
        input_shape = tf.shape(updates, out_type="int32")
        if output_shape is None:
            output_shape = (
                input_shape[0],
                input_shape[1] * self.size[0],
                input_shape[2] * self.size[1],
                input_shape[3],
            )
        self.output_shape1 = output_shape

        one_like_mask = tf.ones_like(mask, dtype="int32")
        batch_shape = tf.concat([[input_shape[0]], [1], [1], [1]], axis=0)
        batch_range = tf.reshape(
            tf.range(output_shape[0], dtype="int32"), shape=batch_shape
        )
        b = one_like_mask * batch_range
        y = mask // (output_shape[2] * output_shape[3])
        x = (mask // output_shape[3]) % output_shape[2]
        feature_range = tf.range(output_shape[3], dtype="int32")
        f = one_like_mask * feature_range

        updates_size = tf.size(updates)
        indices = tf.transpose(tf.reshape(
            tf.stack([b, y, x, f]), [4, updates_size]))
        values = tf.reshape(updates, [updates_size])
        ret = tf.scatter_nd(indices, values, output_shape)
        return ret

    def compute_output_shape(self, input_shape):
        mask_shape = input_shape[1]
        return (
            mask_shape[0],
            mask_shape[1] * self.size[0],
            mask_shape[2] * self.size[1],
            mask_shape[3],
        )


In [None]:
class SegNet(Model):
    def __init__(self, num_classes=NUM_CLASSES, input_shape=INPUT_SHAPE):
        super().__init__()
        vgg19 = tf.keras.applications.vgg19.VGG19(
            include_top=False,   # Exclusion of the last 3 layers
            weights='imagenet',
            # input_tensor=None,
            input_shape=input_shape,
            pooling='max',
            classes=num_classes,
            classifier_activation='relu'
        )
        # Encoder
        # Block 1
        self.b1 = tf.keras.Sequential([vgg19.get_layer('block1_conv1'),
                                      BatchNormalization(),
                                      Activation('relu'),
                                      vgg19.get_layer('block1_conv2'),
                                      BatchNormalization(),
                                      Activation('relu')])
        self.b1p = MaxPoolingWithArgmax2D(name="layerMP1")
        # Block 2
        self.b2 = tf.keras.Sequential([vgg19.get_layer('block2_conv1'),
                                      BatchNormalization(),
                                      Activation('relu'),
                                      vgg19.get_layer('block2_conv2'),
                                      BatchNormalization(),
                                      Activation('relu')])
        self.b2p = MaxPoolingWithArgmax2D(name="layerMP2")
        # Block 3
        self.b3 = tf.keras.Sequential([vgg19.get_layer('block3_conv1'),
                                      BatchNormalization(),
                                      Activation('relu'),
                                      vgg19.get_layer('block3_conv2'),
                                      BatchNormalization(),
                                      Activation('relu'),
                                      vgg19.get_layer('block3_conv3'),
                                      BatchNormalization(),
                                      Activation('relu'),
                                      vgg19.get_layer('block3_conv4'),
                                      BatchNormalization(),
                                      Activation('relu')])
        self.b3p = MaxPoolingWithArgmax2D(name="layerMP3")
        # Block 4
        self.b4 = tf.keras.Sequential([vgg19.get_layer('block4_conv1'),
                                      BatchNormalization(),
                                      Activation('relu'),
                                      vgg19.get_layer('block4_conv2'),
                                      BatchNormalization(),
                                      Activation('relu'),
                                      vgg19.get_layer('block4_conv3'),
                                      BatchNormalization(),
                                      Activation('relu'),
                                      vgg19.get_layer('block4_conv4'),
                                      BatchNormalization(),
                                      Activation('relu')])
        self.b4p = MaxPoolingWithArgmax2D(name="layerMP4")
        # Block 5
        self.b5 = tf.keras.Sequential([vgg19.get_layer('block5_conv1'),
                                      BatchNormalization(),
                                      Activation('relu'),
                                      vgg19.get_layer('block5_conv2'),
                                      BatchNormalization(),
                                      Activation('relu'),
                                      vgg19.get_layer('block5_conv3'),
                                      BatchNormalization(),
                                      Activation('relu'),
                                      vgg19.get_layer('block5_conv4'),
                                      BatchNormalization(),
                                      Activation('relu')])
        self.b5p = MaxPoolingWithArgmax2D(name="layerMP5")

        # Decoder
        # Block 6
        self.b6p = MaxUnpooling2D()
        self.b6 = tf.keras.Sequential([Conv2D(filters=512, activation='relu', kernel_size=(3, 3), kernel_initializer='he_normal', padding='same'),
                                       BatchNormalization(),
                                       Activation('relu'),
                                       Conv2D(filters=512, activation='relu', kernel_size=(
                                           3, 3), kernel_initializer='he_normal', padding='same'),
                                       BatchNormalization(),
                                       Activation('relu'),
                                       Conv2D(filters=512, activation='relu', kernel_size=(
                                           3, 3), kernel_initializer='he_normal', padding='same'),
                                       BatchNormalization(),
                                       Activation('relu'),
                                       Conv2D(filters=512, activation='relu', kernel_size=(
                                           3, 3), kernel_initializer='he_normal', padding='same'),
                                       BatchNormalization(),
                                       Activation('relu')])
        # Block 7
        self.b7p = MaxUnpooling2D()
        self.b7 = tf.keras.Sequential([Conv2D(filters=512, activation='relu', kernel_size=(3, 3), kernel_initializer='he_normal', padding='same'),
                                       BatchNormalization(),
                                       Activation('relu'),
                                       Conv2D(filters=512, activation='relu', kernel_size=(
                                           3, 3), kernel_initializer='he_normal', padding='same'),
                                       BatchNormalization(),
                                       Activation('relu'),
                                       Conv2D(filters=512, activation='relu', kernel_size=(
                                           3, 3), kernel_initializer='he_normal', padding='same'),
                                       BatchNormalization(),
                                       Activation('relu'),
                                       Conv2D(filters=256, activation='relu', kernel_size=(
                                           3, 3), kernel_initializer='he_normal', padding='same'),
                                       BatchNormalization(),
                                       Activation('relu')])
        # Block 8
        self.b8p = MaxUnpooling2D()
        self.b8 = tf.keras.Sequential([Conv2D(filters=256, activation='relu', kernel_size=(3, 3), kernel_initializer='he_normal', padding='same'),
                                       BatchNormalization(),
                                       Activation('relu'),
                                       Conv2D(filters=256, activation='relu', kernel_size=(
                                           3, 3), kernel_initializer='he_normal', padding='same'),
                                       BatchNormalization(),
                                       Activation('relu'),
                                       Conv2D(filters=256, activation='relu', kernel_size=(
                                           3, 3), kernel_initializer='he_normal', padding='same'),
                                       BatchNormalization(),
                                       Activation('relu'),
                                       Conv2D(filters=128, activation='relu', kernel_size=(
                                           3, 3), kernel_initializer='he_normal', padding='same'),
                                       BatchNormalization(),
                                       Activation('relu')])
        # Block 9
        self.b9p = MaxUnpooling2D()
        self.b9 = tf.keras.Sequential([Conv2D(filters=128, activation='relu', kernel_size=(3, 3), kernel_initializer='he_normal', padding='same'),
                                       BatchNormalization(),
                                       Activation('relu'),
                                       Conv2D(filters=64, activation='relu', kernel_size=(
                                           3, 3), kernel_initializer='he_normal', padding='same'),
                                       BatchNormalization(),
                                       Activation('relu')])
        # Block 10
        self.b10p = MaxUnpooling2D()
        self.b10 = tf.keras.Sequential([Conv2D(filters=64, activation='relu', kernel_size=(3, 3), kernel_initializer='he_normal', padding='same'),
                                        BatchNormalization(),
                                        Activation('relu'),
                                        Conv2D(filters=64, activation='relu', kernel_size=(
                                            3, 3), kernel_initializer='he_normal', padding='same'),
                                        BatchNormalization(),
                                        Activation('relu')])
        self.o = Conv2D(filters=NUM_CLASSES, kernel_size=(
            1, 1), padding='valid', activation='softmax')

    def call(self, input):
        # Encoder
        # Block 1
        x = self.b1(input)
        x, index1 = self.b1p(x)
        # Block 2
        x = self.b2(x)
        x, index2 = self.b2p(x)
        # Block 3
        x = self.b3(x)
        x, index3 = self.b3p(x)
        # Block 4
        x = self.b4(x)
        x, index4 = self.b4p(x)
        # Block 5
        x = self.b5(x)
        x, index5 = self.b5p(x)
        # Decoder
        # Block 6
        x = self.b6p([x, index5])
        x = self.b6(x)
        # Block 7
        x = self.b7p([x, index4])
        x = self.b7(x)
        # Block 8
        x = self.b8p([x, index3])
        x = self.b8(x)
        # Block 9
        x = self.b9p([x, index2])
        x = self.b9(x)
        # Block 10
        x = self.b10p([x, index1])
        x = self.b10(x)
        o = self.o(x)
        return o


## unet

In [None]:
import tensorflow as tf
from keras.models import Model
from keras.layers.convolutional import Conv2D, Conv2DTranspose
from keras.layers.merging import concatenate


class Unet(Model):

    def __init__(self, dropout_value=0.3, num_classes=2, input_shape=(512, 512, 3)):
        super().__init__()
        vgg19 = tf.keras.applications.vgg19.VGG19(
            include_top=False,   # Exclusion of the last 3 layers
            weights='imagenet',
            # input_tensor=None,
            input_shape=input_shape,
            pooling='max',
            classes=num_classes,
            classifier_activation='relu'
        )
        # for layer in vgg19.layers:
        #  layer.trainable = False
        # Block 1
        self.b1c1 = vgg19.get_layer('block1_conv1')
        self.b1c2 = vgg19.get_layer('block1_conv2')
        # Block 2
        self.b2p = vgg19.get_layer('block1_pool')
        self.b2c1 = vgg19.get_layer('block2_conv1')
        self.b2c2 = vgg19.get_layer('block2_conv2')
        # Block 3
        self.b3p = vgg19.get_layer('block2_pool')
        self.b3c1 = vgg19.get_layer('block3_conv1')
        self.b3c2 = vgg19.get_layer('block3_conv2')
        # Block 4
        self.b4p = vgg19.get_layer('block3_pool')
        self.b4c1 = vgg19.get_layer('block4_conv1')
        self.b4c2 = vgg19.get_layer('block4_conv2')
        # Block 5
        self.b5p = vgg19.get_layer('block4_pool')
        self.b5c1 = vgg19.get_layer('block5_conv1')
        self.b5c2 = vgg19.get_layer('block5_conv2')
        # Block 6
        # self.b6d1 = Dropout(dropout_value) #Controllando meglio su internet, sembrerebbe che il dropout non è presente nativamente su unet, ma è possibile inserirlo qualora si osservi dell'overfitting
        self.b6p = vgg19.get_layer('block5_pool')
        self.b6c1 = Conv2D(filters=1024, activation='relu', kernel_size=(
            3, 3), kernel_initializer='he_normal', padding='same')
        self.b6c2 = Conv2D(filters=1024, activation='relu', kernel_size=(
            3, 3), kernel_initializer='he_normal', padding='same')
        # self.b6d2 = Dropout(dropout_value)
        self.b6u = Conv2DTranspose(
            512, (3, 3), activation="relu", strides=(2, 2), padding='same')
        # Block 7
        # After concatenate
        self.b7c1 = Conv2D(filters=512, activation='relu', kernel_size=(
            3, 3), kernel_initializer='he_normal', padding='same')
        self.b7c2 = Conv2D(filters=512, activation='relu', kernel_size=(
            3, 3), kernel_initializer='he_normal', padding='same')
        self.b7u = Conv2DTranspose(
            512, (3, 3), activation="relu", strides=(2, 2), padding='same')
        # Block 8
        # After concatenate
        self.b8c1 = Conv2D(filters=512, activation='relu', kernel_size=(
            3, 3), kernel_initializer='he_normal', padding='same')
        self.b8c2 = Conv2D(filters=512, activation='relu', kernel_size=(
            3, 3), kernel_initializer='he_normal', padding='same')
        self.b8u = Conv2DTranspose(
            256, (3, 3), activation="relu", strides=(2, 2), padding='same')
        # Block 9
        # After concatenate
        self.b9c1 = Conv2D(filters=256, activation='relu', kernel_size=(
            3, 3), kernel_initializer='he_normal', padding='same')
        self.b9c2 = Conv2D(filters=256, activation='relu', kernel_size=(
            3, 3), kernel_initializer='he_normal', padding='same')
        self.b9u = Conv2DTranspose(
            128, (3, 3), activation="relu", strides=(2, 2), padding='same')
        # Block 10
        # After concatenate
        self.b10c1 = Conv2D(filters=128, activation='relu', kernel_size=(
            3, 3), kernel_initializer='he_normal', padding='same')
        self.b10c2 = Conv2D(filters=128, activation='relu', kernel_size=(
            3, 3), kernel_initializer='he_normal', padding='same')
        self.b10u = Conv2DTranspose(
            64, (3, 3), activation="relu", strides=(2, 2), padding='same')
        # Block 11
        # After concatenate
        self.b11c1 = Conv2D(filters=64, activation='relu', kernel_size=(
            3, 3), kernel_initializer='he_normal', padding='same')
        self.b11c2 = Conv2D(filters=64, activation='relu', kernel_size=(
            3, 3), kernel_initializer='he_normal', padding='same')
        self.b11c3 = Conv2D(filters=64, activation='relu', kernel_size=(
            3, 3), kernel_initializer='he_normal', padding='same')
        self.b11s = Conv2D(2, (1, 1), activation='softmax')

    def call(self, input):
        # Block 1
        r1 = self.b1c1(input)
        r1 = self.b1c2(r1)
        # Block 2
        r2 = self.b2p(r1)
        r2 = self.b2c1(r2)
        r2 = self.b2c2(r2)
        # Block 3
        r3 = self.b3p(r2)
        r3 = self.b3c1(r3)
        r3 = self.b3c2(r3)
        # Block 4
        r4 = self.b4p(r3)
        r4 = self.b4c1(r4)
        r4 = self.b4c2(r4)
        # Block 5
        r5 = self.b5p(r4)
        r5 = self.b5c1(r5)
        r5 = self.b5c2(r5)
        # Block 6
        # r6 = self.b6d1(r5)
        r6 = self.b6p(r5)
        r6 = self.b6c1(r6)
        r6 = self.b6c2(r6)
        # r6 = self.b6d2(r6)
        r6 = self.b6u(r6)
        # Block 7
        r7 = concatenate([r6, r5])
        r7 = self.b7c1(r7)
        r7 = self.b7c2(r7)
        r7 = self.b7u(r7)
        # Block 8
        r8 = concatenate([r7, r4])
        r8 = self.b8c1(r8)
        r8 = self.b8c2(r8)
        r8 = self.b8u(r8)
        # Block 9
        r9 = concatenate([r8, r3])
        r9 = self.b9c1(r9)
        r9 = self.b9c2(r9)
        r9 = self.b9u(r9)
        # Block 10
        r10 = concatenate([r9, r2])
        r10 = self.b10c1(r10)
        r10 = self.b10c2(r10)
        r10 = self.b10u(r10)
        # Block 11
        r11 = concatenate([r10, r1])
        r11 = self.b11c1(r11)
        r11 = self.b11c2(r11)
        r11 = self.b11c3(r11)
        out = self.b11s(r11)
        return out


## - Code


In [None]:
def generate_data_tensor(image_train, label_train, train=True):
    def generator():
        for img, lbl in zip(image_train, label_train):
            yield img, lbl

    data = tf.data.Dataset.from_generator(generator,
                                          output_signature=(
                                              tf.TensorSpec(
                                                  shape=(512, 512, 3), dtype=tf.float32),
                                              tf.TensorSpec(shape=(512, 512), dtype=tf.int32)))
    data = data.map(
        process_data, num_parallel_calls=tf.data.AUTOTUNE)
    data = data.cache()
    if train:
        data = data.shuffle(500)
    data = data.batch(128)
    data = data.prefetch(tf.data.AUTOTUNE)
    return data

In [None]:
path_to_dataset = "../dataset/slides/dataset.npy"
path_to_labels = "../dataset/annotations/labels.npy"

In [None]:
file = open("../log/training.txt", "x")
file.write("Loading dataset and labels")
dataset = np.load(path_to_dataset)
labels = np.load(path_to_labels)
file.write(
    f"Dataset and labels loaded\nDataset shape {dataset.shape} \nLabels shape {labels.shape}")

In [None]:
image_train, image_vt, label_train, label_vt = train_test_split(
    dataset, labels, test_size=0.30, random_state=42)
image_validation, image_test, label_validation, label_test = train_test_split(
    image_vt, label_vt, test_size=0.33, random_state=42)



file.write("Dataset and labels splitted in train and test set\n" +
      f"image_train shape {image_train.shape} - label_train shape {label_train.shape}" +
      f"image_test shape {image_test.shape} - image_test shape {label_test.shape}")

image_train, label_train = data_aug_impl(dataset, image_train, label_train)

file.write("Applied data agumentation to train set\n" +
      f"image_train augmented shape {image_train.shape} - label_train augmented shape {label_train.shape}")

train_data = generate_data_tensor(image_train, label_train)
validation_data = generate_data_tensor(image_validation, label_validation)
test_data = generate_data_tensor(image_test, image_test, train=False)

In [None]:
# Definisci la funzione per addestrare e valutare un modello con un dato tasso di apprendimento
def train_and_evaluate_model(train_data, validation_data, learning_rate):
    # Costruisci il modello
    model = SegNet()
    # Compila il modello con la funzione di perdita e l'ottimizzatore appropriati
    model.compile(loss="cross_entropy", optimizer=tf.keras.optimizers.Adam(learning_rate), metrics=["accuracy", "F1Score", "Precision", "Recall", "FalseNegatives",
                                                                                               "FalsePositives", "TrueNegatives", "TruePositives",
                                                                                               "cohen_kappa", "MatthewsCorrelationCoefficient"])
    # Addestra il modello sul training set
    history = model.fit(train_data, epochs=10)
    # Valuta il modello sul validation set
    _, accuracy = model.evaluate(validation_data)
    return model, accuracy, history

best_accuracy = 0.0
best_learning_rate = None
best_models = None
best_history = None

learning_rates = [0.001, 0.01, 0.1]

# Valuta ogni tasso di apprendimento e seleziona il migliore
for learning_rate in learning_rates:
    model, accuracy, history = train_and_evaluate_model(learning_rate)
    print(f"Learning Rate: {learning_rate}, Accuracy: {accuracy}")
    if accuracy > best_accuracy:
        best_accuracy = accuracy
        best_learning_rate = learning_rate
        best_model = model
        best_history = history


print(f"Best Learning Rate: {best_learning_rate}, Best Accuracy: {best_accuracy}")