#Import Data and and libraries

In [None]:
from google.colab import drive
import os
drive.mount("/gdrive", force_remount=True)

In [None]:
!nvidia-smi -L

In [None]:
%load_ext tensorboard

In [None]:
#hier path zum https://www.kaggle.com/aryashah2k/breast-ultrasound-images-dataset Datensatz einfügen(archive.zip)
#dieser Datensatz sollte im google drive gespeichert sein, da das einlesen der Bilder andernfalls Ewigkeiten dauert
#im Ordner models werden die einzelnen Modelle gespeichert
drive_path_history = "/gdrive/MyDrive/Marcel_Moczarski/Universtiy/Semester/SS_2021/ML/models/" 
drive_path = "/gdrive/MyDrive/Marcel_Moczarski/Universtiy/Semester/SS_2021/ML/Code/archive.zip"

local_path = "/content"

In [None]:
#damit die Bilddaten schneller eingelesen werden, wird die archive.zip in die lokale Umgebung kopiert

In [None]:
!cp '{drive_path}' .

In [None]:
os.chdir(local_path)

In [None]:
!unzip -q 'archive.zip'

In [None]:
import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
import matplotlib.patches as patches
import matplotlib.cm as cm

import glob
import cv2 
import os

import tensorflow as tf
from sklearn.model_selection import train_test_split
from keras.utils.vis_utils import plot_model

from tqdm import tqdm
from keras import backend as K

In [None]:
#Metrik und Loss-Funktion
def jaccard_coef(y_true, y_pred):
  y_true_f = K.flatten(y_true)
  y_pred_f = K.flatten(y_pred)
  intersect = K.sum(y_true_f * y_pred_f)
  return (intersect + 1.0) / (K.sum(y_true_f) + K.sum(y_pred_f) - intersect * 1.0)

def jaccard_coef_loss(y_true, y_pred):
  return -jaccard_coef(y_true, y_pred)

In [None]:
seed = 1337
epochs = 400

In [None]:
main_path = "Dataset_BUSI_with_GT/"
sub_path = os.listdir(main_path)

img_height = 128
img_width = 128
img_channel = 1

In [None]:
#Einlesen der Bilder
list_img = []
list_mask = []
label = [0, 1, 2]
Y = []
for sub, l  in tqdm(zip(sub_path, label), total=len(sub_path)):
    items_img = sorted(glob.glob(main_path + sub + "/*).png"))
    items_mask = sorted(glob.glob(main_path + sub + "/*_mask*.png"))
    for img in items_img:
        tmp_img = cv2.imread(img, cv2.IMREAD_GRAYSCALE)
        tmp_img = cv2.resize(tmp_img, (img_height, img_width))
        list_img.append(tmp_img)
        Y.append(l)
    for img in items_mask:
        tmp_img = cv2.imread(img, cv2.IMREAD_GRAYSCALE)
        tmp_img = cv2.resize(tmp_img, (img_height, img_width))
        if(img[-5] != "k"):
            tmp_img = (list_mask[-1] + tmp_img).clip(0, 255)
            list_mask[-1] = tmp_img
        else:
            list_mask.append(tmp_img)  

In [None]:
#Skalierung der Bilder
data_img = np.array(list_img)/255.
data_mask = np.array(list_mask)/255.
data_Y = np.array(Y)

In [None]:
X_train, X_test, y_train, y_test, y_class_train, y_class_test = train_test_split(
    data_img, data_mask, data_Y, test_size=0.33, random_state=seed, stratify=data_Y
)

In [None]:
def aug_data(X_train, y_train, y_class_train, aug_transforms=[]):
  X_train_aug = X_train
  y_train_aug = y_train
  y_class_train_aug = y_class_train

  for transform in aug_transforms:
    if transform == "horizontal_flip":
      X_train_aug = np.concatenate([X_train_aug, np.array([np.flipud(i) for i in X_train])])
      y_train_aug = np.concatenate([y_train_aug, np.array([np.flipud(i) for i in y_train])])
      y_class_train_aug = np.concatenate([y_class_train_aug, y_class_train])

    if transform == "vertical_flip":
      X_train_aug = np.concatenate([X_train_aug, np.array([np.fliplr(i) for i in X_train])])
      y_train_aug = np.concatenate([y_train_aug, np.array([np.fliplr(i) for i in y_train])])
      y_class_train_aug = np.concatenate([y_class_train_aug, y_class_train])

    if transform == "right_rotation":
      X_train_aug = np.concatenate([X_train_aug, np.array([np.rot90(i) for i in X_train])])
      y_train_aug = np.concatenate([y_train_aug, np.array([np.rot90(i) for i in y_train])])
      y_class_train_aug = np.concatenate([y_class_train_aug, y_class_train])

    rng = np.random.default_rng(seed=seed)
    rnd_num = rng.integers(0, len(X_train_aug), len(X_train_aug))
    X_train_aug = X_train_aug[rnd_num]
    y_train_aug = y_train_aug[rnd_num]
    y_class_train_aug = y_class_train_aug[rnd_num]

  return X_train_aug, y_train_aug, y_class_train_aug

In [None]:
X_train, y_train, y_class_train = aug_data(X_train, y_train, y_class_train, ["horizontal_flip", "vertical_flip"])
y_class_train = tf.keras.utils.to_categorical(y_class_train, 3)
y_class_test = tf.keras.utils.to_categorical(y_class_test, 3)

In [None]:
#hier manuell die Anzahl der Ebenen und Filter eingeben
network_levels = 4
network_filters = 32
network_params = [network_levels, network_filters]

#Constructing the U-net

In [None]:
#Encoder-Path-Block
def contract_layer(layer, f, f_size=(3,3), dropout=0.1, nopool=False):
    p = 0
    c = tf.keras.layers.Conv2D(
        f, f_size, activation="relu", kernel_initializer="he_normal", padding="same"
    )(layer)
    
    c = tf.keras.layers.Dropout(dropout)(c)
        
    c = tf.keras.layers.Conv2D(
        f, f_size, activation="relu", kernel_initializer="he_normal", padding="same"
    )(c)
    
    if nopool == False: #da die unterste Ebene keine Pooling-Operation erhält
        p = tf.keras.layers.MaxPooling2D((2, 2))(c)
        
    return c, p #gibt den Block vor und nach dem Pooling zurück, der pure Conv2d-Layer wird als Skip-Con. verwendet

In [None]:
#Decoder-Path-Block
def expand_layer(
    layer, concatlayer, f, fT_size=(2,2), f_size=(3,3), strides=(2,2), dropout=0.1
):
    up_layer = tf.keras.layers.Conv2DTranspose(
        f, fT_size, strides=strides, padding="same"
    )(layer)
    
    up_layer = tf.keras.layers.concatenate([up_layer, concatlayer])
    
    c = tf.keras.layers.Conv2D(
        f, f_size, activation="relu", kernel_initializer="he_normal", padding="same"
    )(up_layer)
    
    c = tf.keras.layers.Dropout(dropout)(c) 
    
    c = tf.keras.layers.Conv2D(
        f, f_size, activation="relu", kernel_initializer="he_normal", padding="same"
    )(c)

    c = tf.keras.layers.BatchNormalization(epsilon=2e-5, momentum=0.9)(c) #from VGG16
    return c

In [None]:
#Erstellung des Unets mit Hilfe obiger Blöcke
def buildCNN(h, w, c, levels, filtersize):
  layers = []
  drop = 0.0
  inputs = tf.keras.layers.Input((h, w, c))
  in_layer = inputs

  for l in range(levels):
    if (l % 2) == 0:
      drop = drop*10
      drop = (drop + 1)/10
    c_tmp, p_tmp = contract_layer(in_layer, filtersize, dropout=drop)
    in_layer = p_tmp
    layers.append([c_tmp, p_tmp])
    filtersize = int(2*filtersize)

  drop = drop*10
  drop = (drop + 1)/10
  in_layer, _ = contract_layer(in_layer, filtersize, dropout=drop, nopool=True)
  layers.append([in_layer, "_"])
  drop = drop*10
  drop = (drop - 1)/10
  filtersize = int(filtersize/2)

  for l in range(levels)[::-1]:
    c_tmp= expand_layer(layers[-1][0], layers[l][0], filtersize, dropout=drop)
    layers.append([c_tmp, "_"])
    if (l % 2) == 0:
      drop = drop*10
      drop = (drop - 1)/10
    filtersize = int(filtersize/2)

  out_layer = tf.keras.layers.Conv2D(1, (1, 1), activation="sigmoid")(layers[-1][0])
  
  model = tf.keras.Model(inputs=[inputs], outputs=[out_layer])

  return model

In [None]:
model1 = buildCNN(img_height, img_width, img_channel, network_levels, network_filters)
model1.compile(optimizer="adam", loss=[jaccard_coef_loss], metrics=[jaccard_coef, "accuracy"])

In [None]:
monitor = "val_loss"
callbacks1 = [
             tf.keras.callbacks.EarlyStopping(patience=20, monitor=monitor), 
             tf.keras.callbacks.ModelCheckpoint("best_model_weights.h5", verbose=1, save_best_only=True, mode="min"),
             tf.keras.callbacks.ReduceLROnPlateau(monitor=monitor, factor=0.1, patience=10, verbose=1, min_delta=1e-4, mode="min"),
             tf.keras.callbacks.TensorBoard("logs", histogram_freq=1), #nur damit während des langen Trainings einsicht auf Loss und Metrik möglich ist
             ] 

In [None]:
network_history1 = model1.fit(
    X_train, y_train, validation_split=0.2, batch_size=16, epochs=epochs, callbacks=callbacks1
)

In [None]:
model1.load_weights(filepath="best_model_weights.h5")

#Constructing the Attention-Recurrent-Residual-U-net

In [None]:
#Attention-Block
def AttBlock(x, g, filter):
  theta = tf.keras.layers.Conv2D(filter, (2 ,2), strides=(2, 2), padding="same")(x)
  phi = tf.keras.layers.Conv2D(filter, (1 ,1), strides=(1, 1), padding="same")(g)
  phi = tf.keras.layers.Conv2DTranspose(filter, (3, 3), strides=(1, 1), padding="same")(phi)

  added_layers = tf.keras.layers.Add()([phi, theta])
  added_layers = tf.keras.layers.Activation("relu")(added_layers)

  psi = tf.keras.layers.Conv2D(1, (1 ,1), padding="same")(added_layers)
  psi = tf.keras.layers.Activation("sigmoid")(psi)

  psi = tf.keras.layers.Conv2DTranspose(filter, (3, 3), strides=(2, 2), padding="same")(psi)

  up_layer = tf.keras.layers.multiply([psi, x])
  up_layer = tf.keras.layers.Conv2D(filter, (1, 1), padding="same")(up_layer)
  up_layer = tf.keras.layers.BatchNormalization()(up_layer)

  return up_layer

In [None]:
#Encoder-Block, diesmal mit Residual- und Rec-Operationen
def R2Block(x, filter, kernel=(3,3), dropout=0.1, nopool=False): #filter = filtersize = #features, kernel=kernel_size
  skip_connect = tf.keras.layers.Conv2D(filter, 1, padding="same")(x) #skip connection
  skip_connect = tf.keras.layers.BatchNormalization()(skip_connect)
  x = skip_connect
  for i in range(2): #corresponds to c1 and c2 compared to normal unet
    c = tf.keras.layers.Conv2D(filter, kernel, padding="same")(x)
    c = tf.keras.layers.BatchNormalization()(c)
    c = tf.keras.layers.Activation("relu")(c)
    for i in range(2): #recurrent loop for each c_2
      added_layers = tf.keras.layers.Add()([c, x])
      c = tf.keras.layers.Conv2D(filter, kernel, padding="same")(added_layers)
      c = tf.keras.layers.BatchNormalization()(c)
      c = tf.keras.layers.Activation("relu")(c)
    x = c

  connected = tf.keras.layers.Add()([skip_connect, x]) #hier Residual
  connected = tf.keras.layers.Activation("relu")(connected)
  
  if nopool == False: 
      x = tf.keras.layers.MaxPooling2D((2, 2))(connected)
      
  return connected, x

In [None]:
def UpSamplingBlock(x, connection, filter, kernel=(3,3), strides=(2,2), dropout=0.1): #x = down_layer
  #gating-signal
  gating_signal = tf.keras.layers.Conv2D(filter, (1,1), padding="same")(x) #to get down_layer features to size of up_layer
  gating_signal = tf.keras.layers.BatchNormalization()(gating_signal)
  gating_signal = tf.keras.layers.Activation("relu")(gating_signal)
  
  #attention-block
  att = AttBlock(connection, gating_signal, filter) #attention block with r2blocks
  up_layer = tf.keras.layers.Conv2DTranspose(filter, kernel, strides=strides, activation="relu", padding="same")(x)
  concated_up_layer = tf.keras.layers.concatenate([up_layer, att])

  #R2-block
  concated_conv, _ = R2Block(concated_up_layer, filter)

  return concated_conv

In [None]:
#Erstellt das AR2-U-Net 
def ResbuildCNN(h, w, c, levels, filtersize):
  tf.keras.backend.clear_session()

  layers = []  #saves conv blocks for concatenate in extensive path + pooled layers for next lower level in contraction path             
  inputs = tf.keras.layers.Input((h, w, c))
  in_layer = inputs

  for l in range(levels):
    c_tmp, p_tmp = R2Block(in_layer, filtersize)
    in_layer = p_tmp
    layers.append([c_tmp, p_tmp])
    filtersize = int(2*filtersize)

  in_layer, _ = R2Block(in_layer, filtersize, nopool=True)
  layers.append([in_layer, "_"])
  filtersize = int(filtersize/2)

  for l in range(levels)[::-1]:
    c_tmp = UpSamplingBlock(layers[-1][0], layers[l][0], filtersize) 
    layers.append([c_tmp, "_"])
    filtersize = int(filtersize/2)

  out_layer = tf.keras.layers.Conv2D(1, (1, 1), activation="sigmoid")(layers[-1][0])
  
  model = tf.keras.Model(inputs=[inputs], outputs=[out_layer])
  return model

In [None]:
model2 = ResbuildCNN(img_height, img_width, img_channel, network_levels, network_filters)
model2.compile(optimizer="adam", loss=[jaccard_coef_loss], metrics=[jaccard_coef, "accuracy"])

In [None]:
monitor = "val_loss"
callbacks2 = [
             tf.keras.callbacks.EarlyStopping(patience=20, monitor=monitor), 
             tf.keras.callbacks.ModelCheckpoint("best_model2_weights.h5", verbose=1, save_best_only=True, monitor="val_loss", mode="min"),
             tf.keras.callbacks.ReduceLROnPlateau(monitor=monitor, factor=0.1, patience=10, verbose=1, min_delta=1e-4, mode="min"),
             tf.keras.callbacks.TensorBoard("logs", histogram_freq=1),
             ] 

In [None]:
# %tensorboard --logdir logs

In [None]:
network_history2 = model2.fit(
    X_train, y_train, validation_split=0.2, batch_size=16, epochs=epochs, callbacks=callbacks2
)

In [None]:
model2.load_weights(filepath="best_model2_weights.h5")

#Evaluate the models

In [None]:
preds_test1 = model1.predict(X_test, verbose=1)
preds_test2 = model2.predict(X_test, verbose=1)

preds_test1 = np.squeeze(preds_test1 > 0.5)
preds_test2 = np.squeeze(preds_test2 > 0.5)

print("model1: ", model1.evaluate(X_test, y_test, verbose=0))
print("model2: ", model2.evaluate(X_test, y_test, verbose=0))

#Save Models and Histories

In [None]:
#plottet die Loss- und Metrik-Kurven
def plot_history(history, save_path, model_name): 
  plt.figure(figsize=(8, 6))
  plt.xlabel('Epochs')
  plt.ylabel('Jaccard Loss')
  plt.plot(history['loss'])
  plt.plot(history['val_loss'])
  plt.legend(['Training', 'Validation'])
  plt.grid(ls="dotted")
  plt.savefig(save_path + "/" + model_name + "_att_r2_unet_jaccard_loss.pdf")
  plt.clf()

  plt.figure(figsize=(8, 6))
  plt.xlabel('Epochs')
  plt.ylabel('Jaccard coefficient')
  plt.plot(history['jaccard_coef'])
  plt.plot(history['val_jaccard_coef'])
  plt.legend(['Training', 'Validation'])
  plt.grid(ls="dotted")
  plt.savefig(save_path + "/" + model_name + "_att_r2_unet_jaccard_coef.pdf")
  plt.clf()

  plt.figure(figsize=(8, 6))
  plt.xlabel('Epochs')
  plt.ylabel('accuracy')
  plt.plot(history['accuracy'])
  plt.plot(history['val_accuracy'])
  plt.legend(['Training', 'Validation'])
  plt.grid(ls="dotted")
  plt.savefig(save_path + "/" + model_name + "_att_r2_unet_accuracy.pdf")
  plt.clf()

In [None]:
#Speichert die Modelle, sowie die Loss- und Metrik-Kurven, sowie die Maximalen Scores auf dem Testdatensatz
def save_history(model1, model2, history1, history2, new_run = True):
  save_path = drive_path_history + str(img_height) 

  if os.path.isdir(save_path) == False:
    os.mkdir(save_path)

  save_path = drive_path_history + str(img_height) + "/" + "levels_" + str(network_levels) + "-" + "filters_" + str(network_filters)
  current_run = "0" 

  if os.path.isdir(save_path) == False:
    os.mkdir(save_path)
  runs = os.listdir(save_path)
  if len(runs) == 0:
    current_run = "/01"
    save_path = save_path + current_run
    os.mkdir(save_path)
  else:
    run_int = int(runs[-1])
    if new_run == True:
      run_int = run_int + 1
      if run_int < 10:
        current_run = "/0" + str(run_int)
      else:
        current_run = "/" + str(run_int)
      save_path = save_path + current_run
      os.mkdir(save_path)
    else:
        current_run = "/" + runs[-1]
        save_path = save_path + current_run
  model1.save(save_path + "/model1_weights.h5")
  model2.save(save_path + "/model2_weights.h5")
  
  pd.DataFrame(history1.history).to_csv(save_path + "/model1_history.csv", index=False)
  pd.DataFrame(history2.history).to_csv(save_path + "/model2_history.csv", index=False)

  plot_history(history1.history, save_path, "model1")
  plot_history(history1.history, save_path, "model2")

  np.savetxt(save_path + "/model1_score.txt", model1.evaluate(X_test, y_test, verbose=0))
  np.savetxt(save_path + "/model2_score.txt", model2.evaluate(X_test, y_test, verbose=0))

save_history(model1, model2, network_history1, network_history2, False) #False damit kein neuer Ordner mit neuem Model erstellt wird

In [None]:
#sucht das beste U-Net Modell und das beste AR2-U-Net-Modell, sowie die Scores aller Modelle
def get_models():
    models = ["model1", "model2"]
    highscore = []
    score_comp = []
    for model in models:
        scores_list = glob.glob(drive_path_history + str(img_height) + "/**/*"+ model + "_score.txt", recursive=True)
        scores = np.zeros(len(scores_list))
        score_comp_arr = np.zeros((len(scores), 3))
        for i, s in enumerate(scores_list):
            scores[i] = np.genfromtxt(s)[1]
            
            start = s.find("levels_") + len("levels_")
            end = s.find("-filter")
            l = s[start:end]
            start = s.find("filters_") + len("filters_")
            end = -20
            f = s[start:end]
            
            score_comp_arr[i, 0] = l
            score_comp_arr[i, 1] = f
            score_comp_arr[i, 2] = scores[i]
        score_comp.append(score_comp_arr)
        best_model_path = scores_list[scores.argmax()][:-9] + "weights.h5"
        highscore.append(best_model_path)
    return highscore, score_comp
highscore, scores = get_models()

In [None]:
#Zur Überprüfung, welches das beste Model ist
highscore

In [None]:
#lädt die besten Modelle, und erstellt die Vorhersagen
best_model1 = tf.keras.models.load_model(filepath=highscore[0], compile=False)
best_model2 = tf.keras.models.load_model(filepath=highscore[1], compile=False)

best_model1.compile(optimizer="adam", loss=[jaccard_coef_loss], metrics=[jaccard_coef, "accuracy"])
best_model2.compile(optimizer="adam", loss=[jaccard_coef_loss], metrics=[jaccard_coef, "accuracy"])

preds_test1 = best_model1.predict(X_test, verbose=1)
preds_test2 = best_model2.predict(X_test, verbose=1)

preds_test1 = np.squeeze(preds_test1 > 0.5)
preds_test2 = np.squeeze(preds_test2 > 0.5)

print("model1: ", best_model1.evaluate(X_test, y_test, verbose=0))
print("model2: ", best_model2.evaluate(X_test, y_test, verbose=0))

print(highscore)

In [None]:
best_model1.count_params()

In [None]:
best_model2.count_params()

In [None]:
#Vergleichsplot aller Modelle
def plot_comparison(scores, save_path):
  scores_sorted_list = []

  fig = plt.figure(figsize=(10, 8))
  ax = fig.add_subplot(1, 1, 1)
  plot_length = np.bincount(scores[0][:, 0].astype("int64"))

  for s in scores:
    k = 0
    score_counter = np.bincount(s[:,0].astype("int64"))
    plot_length = score_counter.sum()
    for i in score_counter:
      if i != 0:
        score_mask = s[:, 0] == k
        score_sorted = np.argsort(s[score_mask][:, 1])
        tmp_array = s[score_mask][score_sorted]
        scores_sorted_list.append(tmp_array)
      k = k + 1
  scores_sorted_arr = np.concatenate(scores_sorted_list, axis=0 )
  x = np.arange(0, plot_length)
  x_label = scores_sorted_arr[:plot_length, 1].astype("int")

  plt.xticks(x, x_label)
  ax.plot(x, scores_sorted_arr[:plot_length, 2], "o", label="U-Net")
  ax.plot(x, scores_sorted_arr[plot_length:, 2], "ro", label="AR2-U-Net")

  minimum = scores_sorted_arr[:, 2].min()
  maximum = scores_sorted_arr[:, 2].max()
  height = maximum - minimum

  box_count = np.unique(scores_sorted_arr[:, 0])
  colors = cm.Reds(np.linspace(0.3, 0.9, len(box_count)))

  count = 0

  i_1 = scores_sorted_arr[:plot_length][0, 1]
  m = 0
  m_list = []
  for i in scores_sorted_arr[:plot_length][1:, 1]:
    if i < i_1:
      i_1 = i
      m_list.append(m)
    i_1 = i
    m = m + 1
  m_list = np.asarray(m_list)   
  start = 0
  tmp_i = 0

  for i, (j, c) in enumerate(zip(box_count, colors)):
    if i < len(m_list):
      end = m_list[i]

      tmp = x[start:end]
      width = len(tmp) + 0.2
      rect = patches.Rectangle((start-0.1, minimum-0.05), width, height+height/2, linewidth=1, edgecolor=c, facecolor=c, alpha=0.3)
      ax.add_patch(rect)
      start = end+1
      textpos = tmp[0] +((tmp[-1]+1 - tmp[0])/2)
      plt.text(textpos-0.25, minimum-0.005, "n = " + str(int(j)))
    else:
      end = x[-1]
      tmp = x[start:end]
      
      width = len(tmp) + 0.2
#         if width == 0.2:
#             width = 
      rect = patches.Rectangle((start-0.1, minimum-0.05), width, height+height/2, linewidth=1, edgecolor=c, facecolor=c, alpha=0.3)
      ax.add_patch(rect)
      if len(tmp) > 1:
        textpos = tmp[0] +((tmp[-1]+1 - tmp[0])/2)
      else: 
        textpos = x[-1]

      plt.text(textpos-0.25, minimum-0.005, "n = " + str(int(j)))

  ax.set_ylim([minimum - 0.05, maximum + 0.05])
  ax.legend()
  ax.grid(ls="dotted")
  ax.set_xlabel("$l(n)$")
  ax.set_ylabel("$J$")
  plt.savefig(save_path + "/" + str(img_height) + "/model_comparison.pdf")
plot_comparison(scores, drive_path_history)

In [None]:
#Berechnung der einzelnen Jaccard-Koeffizienten (Für Plot-Legende)
coefs1 = np.zeros(len(y_test))
coefs2 = np.zeros(len(y_test))

for n, (i, j) in enumerate(zip(y_test, np.float64(preds_test1))):
  c = jaccard_coef(i, j)
  if c > 1:
    c = 1
  coefs1[n] = c

for n, (i, j) in enumerate(zip(y_test, np.float64(preds_test2))):
  c = jaccard_coef(i, j)
  if c > 1:
    c = 1
  coefs2[n] = c

In [None]:
# Plottet Beispiel Bilder, inkl. Groundtruth + Prediction beider Modelle
def print_img(a):
  fig, axs = plt.subplots(2, 4, figsize=(15, 10))
  axs[0, 0].imshow(X_test[a], cmap='gray')
  axs[0, 1].imshow(y_test[a], cmap='gray')
  
  axs[0, 2].imshow(preds_test1[a], cmap='gray')
  axs[0, 2].plot(0, 0, "k.", label=coefs1[a])
  axs[0, 2].legend()

  axs[0, 3].imshow(preds_test2[a], cmap='gray')
  axs[0, 3].plot(0, 0, "k.", label=coefs2[a])
  axs[0, 3].legend()
  
  a = a + 3
  axs[1, 0].imshow(X_test[a], cmap='gray')
  axs[1, 1].imshow(y_test[a], cmap='gray')
  
  axs[1, 2].imshow(preds_test1[a], cmap='gray')
  axs[1, 2].plot(0, 0, "k.", label=coefs1[a])
  axs[1, 2].legend()

  axs[1, 3].imshow(preds_test2[a], cmap='gray')
  axs[1, 3].plot(0, 0, "k.", label=coefs2[a])
  axs[1, 3].legend()

  plt.savefig(drive_path_history + "/" + str(img_height) +  "/" + str(a) + "_comp.pdf")
rng_img = np.random.default_rng(seed=seed)
rnd_num = rng_img.integers(0, len(X_test), 1)
print_img(rnd_num[0])

#A  small comparison to classical Canny Edge Detection

In [None]:
#Canny Edge Detection
from scipy import ndimage as ndi
from skimage import feature
blurred_img = [ndi.gaussian_filter(X_test[i], 4) for i in range(0,len(X_test))]
data_canny_edge = [feature.canny(blurred_img[i], sigma=3) for i in range(0,len(X_test))]

In [None]:
a = 30
plt.figure(figsize=(13, 3))
plt.subplot(141)
plt.imshow(X_test[a], cmap='gray')
plt.subplot(142)
plt.imshow(y_test[a], cmap='gray')
plt.subplot(143)
plt.imshow(preds_test1[a], cmap='gray')
plt.subplot(144)
plt.imshow(data_canny_edge[a], cmap='gray')
plt.savefig(drive_path_history + "/" + str(img_height) + "/canny_edge.pdf")
