# Try using keras Functional API to build Unet

For subclass model method of building, refer to `./UnetExperiment_Subclass.ipynb`.

In [114]:
# Model building
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import backend as K
# from keras.backend.common import _EPSILON
# from tensorflow.keras.backend import common 

# Misc
import numpy as np
import os
from datetime import datetime as dt
import jsonref
import json

# Save
import pickle

from sklearn.model_selection import train_test_split

# For custom weighted loss
# from tensorflow.keras.backend import _to_tensor

In [117]:
print(keras.__version__)
print(tf.__version__)

2.4.0
2.3.1


In [3]:
dt.now().strftime("%Y%m%d_%H%M%S")

'20201205_153027'

---

In [10]:
weights = [1, 9]

K_weights = K.constant(weights)

if not K.is_tensor(y_pred):
    y_pred = K.constant(y_pred)

# Cast y_true to the datatype of y_pred.
y_true = K.cast(y_true, y_pred.dtype)


tf.Tensor([1. 9.], shape=(2,), dtype=float32)


In [87]:



print()
print(K.categorical_crossentropy(target=y_true, output=y_pred, from_logits=False, axis=-1))

tf.Tensor(
[[0. 1.]
 [0. 1.]], shape=(2, 2), dtype=float32)
tf.Tensor(
[[0.2 0.9]
 [0.1 0.8]], shape=(2, 2), dtype=float32)

tf.Tensor([0.20067078 0.11778302], shape=(2,), dtype=float32)


In [44]:
print(y_pred)
denom = tf.reduce_sum(y_pred, axis=1, keepdims=True)
print(denom)
y_pred = y_pred / denom
print(y_pred)
print(tf.math.log(y_pred))
print(- tf.reduce_sum(y_true * tf.math.log(y_pred), axis=len(y_pred.get_shape()) - 1))

tf.Tensor(
[[0.2 0.9]
 [0.1 0.8]], shape=(2, 2), dtype=float32)
tf.Tensor(
[[1.1       ]
 [0.90000004]], shape=(2, 1), dtype=float32)
tf.Tensor(
[[0.18181819 0.81818175]
 [0.11111111 0.8888889 ]], shape=(2, 2), dtype=float32)
tf.Tensor(
[[-1.704748   -0.20067078]
 [-2.1972246  -0.11778302]], shape=(2, 2), dtype=float32)
tf.Tensor([0.20067078 0.11778302], shape=(2,), dtype=float32)


In [144]:
def weighted_pixelwise_crossentropy(class_weights):
    
    def compute_loss(y_true, y_pred):
        # Clip y_pred to prevent log(0).
        epsilon = K.epsilon()
        y_pred = tf.clip_by_value(t=y_pred, clip_value_min=epsilon, clip_value_max=1. - epsilon)
        
        ce_matrix = tf.multiply(y_true, tf.math.log(y_pred))
        weighted_ce_loss = - tf.reduce_sum(tf.multiply(ce_matrix, class_weights))

        return weighted_ce_loss

    return compute_loss

def get_class_weights(y_true, class_0_weight, class_1_weight):
    class_weights = np.where(y_true==0, class_0_weight, class_1_weight)
    class_weights = K.constant(class_weights)
    
    return class_weights


In [149]:
y_true = np.array([[[0, 1], [0, 0]], [[1, 0], [1, 1]]])
y_pred = np.array([[[0.2, 0.9], [0.1, 0.3]], [[0.9, 0.1], [0.1, 0.2]]])

y_true = K.constant(y_true)
y_pred = K.constant(y_pred)

print(y_true)
print(y_pred)

tf.Tensor(
[[[0. 1.]
  [0. 0.]]

 [[1. 0.]
  [1. 1.]]], shape=(2, 2, 2), dtype=float32)
tf.Tensor(
[[[0.2 0.9]
  [0.1 0.3]]

 [[0.9 0.1]
  [0.1 0.2]]], shape=(2, 2, 2), dtype=float32)


In [143]:
class_weights = get_class_weights(y_true=y_true, class_0_weight=0.1, class_1_weight=0.9)
compute_loss = weighted_pixelwise_crossentropy(class_weights)
print(compute_loss(y_true, y_pred))

tf.Tensor(0.29565367, shape=(), dtype=float32)


In [79]:
train_full_img_dir = "../data/preprocessed/Mass/Train_FULL"
train_mask_img_dir = "../data/preprocessed/Mass/Train_MASK"
test_full_img_dir = "../data/preprocessed/Mass/Test_FULL"
test_mask_img_dir = "../data/preprocessed/Mass/Test_MASK"
extension = ".png"
valtest_split = 0.5

train_full_img_paths = []
train_mask_img_paths = []
test_full_img_paths = []
test_mask_img_paths = []

# Get paths of train images and masks.
for full in os.listdir(train_full_img_dir):
    if full.endswith(extension):
        train_full_img_paths.append(os.path.join(train_full_img_dir, full))

for mask in os.listdir(test_full_img_dir):
    if mask.endswith(extension):
        train_mask_img_paths.append(os.path.join(train_mask_img_dir, mask))

# Get paths of test images and masks.
for full in os.listdir(test_full_img_dir):
    if full.endswith(extension):
        test_full_img_paths.append(os.path.join(test_full_img_dir, full))

for mask in os.listdir(test_mask_img_dir):
    if mask.endswith(extension):
        test_mask_img_paths.append(os.path.join(test_mask_img_dir, mask))

# Sort so that FULL and MASK images are in an order that corresponds
# with each other.
train_full_img_paths.sort()
train_mask_img_paths.sort()
test_full_img_paths.sort()
test_mask_img_paths.sort()

# Split test paths into validation and test sets.
valid_x, test_x = train_test_split(test_full_img_paths, test_size=valtest_split, random_state=42)
valid_y, test_y = train_test_split(test_mask_img_paths, test_size=valtest_split, random_state=42)

In [49]:
img = np.array([[1, 2], [3, 4]])
print(img.shape)
stacked_img = np.stack([img, img, img], axis=-1)
print(stacked_img.shape)
print(stacked_img)

(2, 2)
(2, 2, 3)
[[[1 1 1]
  [2 2 2]]

 [[3 3 3]
  [4 4 4]]]


In [6]:
def build_encoder():
        
        # =========
        #  Encoder
        # =========
        
        # Get base model
        VGG16_ = keras.applications.VGG16(include_top=False,
                                          weights="imagenet",
                                          input_shape=self.encoder_input_shape)
        
        # Get list of layer names for skip connections later
        layer_names = [layer.name for layer in VGG16_.layers]

        # Get layer outputs
        all_layer_outputs = [VGG16_.get_layer(layer_name).output for layer_name in layer_names]
        
        # Create encoder model
        encoder_model = keras.Model(inputs=VGG16_.input, outputs=all_layer_outputs)
        
        # Freeze layers
        encoder_model.trainable = False
        
        return encoder_model

In [7]:
m = build_encoder

In [8]:
type(m)

function

In [85]:
class UnetVGG16:
    
    def __init__(self, encoder_input_shape, learning_rate, batch_size, kernel_size, decoder_strides, decoder_padding, decoder_activation):
        
        self.encoder_input_shape = encoder_input_shape
        self.learning_rate = learning_rate
        self.batch_size = batch_size
        self.kernel_size = kernel_size
        self.decoder_strides = decoder_strides
        self.decoder_padding = decoder_padding
        self.decoder_activation = decoder_activation
        
        
    def build_encoder(self):
        
        # =========
        #  Encoder
        # =========
        
        # Get base model
        VGG16_ = keras.applications.VGG16(include_top=False,
                                          weights="imagenet",
                                          input_shape=self.encoder_input_shape)
        
        # Get list of layer names for skip connections later
        layer_names = [layer.name for layer in VGG16_.layers]

        # Get layer outputs
        all_layer_outputs = [VGG16_.get_layer(layer_name).output for layer_name in layer_names]
        
        # Create encoder model
        encoder_model = keras.Model(inputs=VGG16_.input, outputs=all_layer_outputs)
        
        # Freeze layers
        encoder_model.trainable = False
        
        return encoder_model
        
    
    def build_unet(self):
        
        # =============
        #  Input layer
        # =============
        
        unet_input = keras.Input(shape=self.encoder_input_shape, name="unet_input_layer")
        
        
        # =========
        #  Encoder
        # =========
        
        encoder_model = self.build_encoder()
        all_encoder_layer_outputs = encoder_model(unet_input)
        
        # Get final encoder output (this will be the input for the decoder)
        encoded_img = all_encoder_layer_outputs[-1]
        
        # Get outputs to be used for skip connections
        # (I know the specific layers to be used for skip connections)
        skip_outputs = [all_encoder_layer_outputs[i] for i in [2, 5, 9, 13, 17]]
        
        
        # =========
        #  Decoder
        # =========
        
        decoder_filters = encoded_img.shape[-1]
        
#         # Input layer into decoder
#         decoder_input = keras.Input(shape=encoded_img.shape[1:], name="encoded_img")
        
        # ------------------------------------------
        # Block 5 - `encoded_img` as initial input for decoder
        x = keras.layers.Conv2DTranspose(name="block5_up_convT",
                                          filters=decoder_filters,
                                          kernel_size=self.kernel_size,
                                          strides=self.decoder_strides,
                                          padding=self.decoder_padding,
                                          activation=self.decoder_activation)(encoded_img)
        
        x = keras.layers.Concatenate(name="block5_up_concat", axis=-1)([x, skip_outputs[4]])
        
        x = keras.layers.Conv2D(name="block5_up_conv3",
                                filters=decoder_filters,
                                kernel_size=self.kernel_size,
                                strides=(1, 1),
                                padding="same",
                                activation="relu")(x)
        x = keras.layers.Conv2D(name="block5_up_conv2",
                                filters=decoder_filters,
                                kernel_size=self.kernel_size,
                                strides=(1, 1),
                                padding="same",
                                activation="relu")(x)
        x = keras.layers.Conv2D(name="block5_up_conv1",
                                filters=decoder_filters,
                                kernel_size=self.kernel_size,
                                strides=(1, 1),
                                padding="same",
                                activation="relu")(x)
        
        
        # ------------------------------------------
        # Block 4
        x = keras.layers.Conv2DTranspose(name="block4_up_convT",
                                          filters=decoder_filters,
                                          kernel_size=self.kernel_size,
                                          strides=self.decoder_strides,
                                          padding=self.decoder_padding,
                                          activation=self.decoder_activation)(x)
        
        x = keras.layers.Concatenate(name="block4_up_concat", axis=-1)([x, skip_outputs[3]])
        
        x = keras.layers.Conv2D(name="block4_up_conv3",
                                filters=decoder_filters,
                                kernel_size=self.kernel_size,
                                strides=(1, 1),
                                padding="same",
                                activation="relu")(x)
        x = keras.layers.Conv2D(name="block4_up_conv2",
                                filters=decoder_filters,
                                kernel_size=self.kernel_size,
                                strides=(1, 1),
                                padding="same",
                                activation="relu")(x)
        x = keras.layers.Conv2D(name="block4_up_conv1",
                                filters=decoder_filters,
                                kernel_size=self.kernel_size,
                                strides=(1, 1),
                                padding="same",
                                activation="relu")(x)
        
        
        # ------------------------------------------
        # Block 3
        x = keras.layers.Conv2DTranspose(name="block3_up_convT",
                                          filters=decoder_filters / 2,
                                          kernel_size=self.kernel_size,
                                          strides=self.decoder_strides,
                                          padding=self.decoder_padding,
                                          activation=self.decoder_activation)(x)
        
        x = keras.layers.Concatenate(name="block3_up_concat", axis=-1)([x, skip_outputs[2]])
        
        x = keras.layers.Conv2D(name="block3_up_conv3",
                                filters=decoder_filters,
                                kernel_size=self.kernel_size,
                                strides=(1, 1),
                                padding="same",
                                activation="relu")(x)
        x = keras.layers.Conv2D(name="block3_up_conv2",
                                filters=decoder_filters,
                                kernel_size=self.kernel_size,
                                strides=(1, 1),
                                padding="same",
                                activation="relu")(x)
        x = keras.layers.Conv2D(name="block3_up_conv1",
                                filters=decoder_filters,
                                kernel_size=self.kernel_size,
                                strides=(1, 1),
                                padding="same",
                                activation="relu")(x)
        
        
        # ------------------------------------------
        # Block 2
        x = keras.layers.Conv2DTranspose(name="block2_up_convT",
                                          filters=decoder_filters / 4,
                                          kernel_size=self.kernel_size,
                                          strides=self.decoder_strides,
                                          padding=self.decoder_padding,
                                          activation=self.decoder_activation)(x)
        
        x = keras.layers.Concatenate(name="block2_up_concat", axis=-1)([x, skip_outputs[1]])
        
        x = keras.layers.Conv2D(name="block2_up_conv2",
                                filters=decoder_filters,
                                kernel_size=self.kernel_size,
                                strides=(1, 1),
                                padding="same",
                                activation="relu")(x)
        x = keras.layers.Conv2D(name="block2_up_conv1",
                                filters=decoder_filters,
                                kernel_size=self.kernel_size,
                                strides=(1, 1),
                                padding="same",
                                activation="relu")(x)
        
        
        # ------------------------------------------
        # Block 1
        x = keras.layers.Conv2DTranspose(name="block1_up_convT",
                                          filters=decoder_filters / 4,
                                          kernel_size=self.kernel_size,
                                          strides=self.decoder_strides,
                                          padding=self.decoder_padding,
                                          activation=self.decoder_activation)(x)
        
        x = keras.layers.Concatenate(name="block1_up_concat", axis=-1)([x, skip_outputs[0]])
        
        x = keras.layers.Conv2D(name="block1_up_conv2",
                                filters=decoder_filters,
                                kernel_size=self.kernel_size,
                                strides=(1, 1),
                                padding="same",
                                activation="relu")(x)
        x = keras.layers.Conv2D(name="block1_up_conv1",
                                filters=decoder_filters,
                                kernel_size=self.kernel_size,
                                strides=(1, 1),
                                padding="same",
                                activation="relu")(x)
        
        
        # ------------------------------------------
        # Final conv layer
        final_img = keras.layers.Conv2D(name="final_up_conv",
                                        filters=3,
                                        kernel_size=self.kernel_size,
                                        strides=(1, 1),
                                        padding="same",
                                        activation="relu")(x)
        
        # ======
        #  Unet
        # ======
        
        unet = keras.Model(inputs=unet_input, outputs=final_img, name="Unet_VGG16")
        
        return unet

In [86]:
unet = UnetVGG16(encoder_input_shape=(448, 448, 3),
                 learning_rate=0.001,
                 batch_size=1,
                 kernel_size=(3, 3),
                 decoder_strides=(2, 2),
                 decoder_padding="same",
                 decoder_activation=None)

In [87]:
unet_model = unet.build_unet()

In [88]:
unet_model.summary()

Model: "Unet_VGG16"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
unet_input_layer (InputLayer)   [(None, 448, 448, 3) 0                                            
__________________________________________________________________________________________________
functional_16 (Functional)      [(None, 448, 448, 3) 14714688    unet_input_layer[0][0]           
__________________________________________________________________________________________________
block5_up_convT (Conv2DTranspos (None, 28, 28, 512)  2359808     functional_16[0][18]             
__________________________________________________________________________________________________
block5_up_concat (Concatenate)  (None, 28, 28, 1024) 0           block5_up_convT[0][0]            
                                                                 functional_16[0][17]    

In [95]:
keras.utils.plot_model(unet_model, to_file="./experimental_outputs/unet_model_v1.png", show_shapes=True, show_layer_names=True, dpi=300)

('Failed to import pydot. You must `pip install pydot` and install graphviz (https://graphviz.gitlab.io/download/), ', 'for `pydotprint` to work.')


In [41]:
unet_model.compile(optimizer=keras.optimizers.Adam(),
                   loss="binary_crossentropy",
                   metrics=["accuracy"])

In [42]:
unet_model.fit(GenerateInputs(X=x_train, y=y_train), batch_size=1, steps_per_epoch = 5, epochs=2, verbose=2)

Epoch 1/2
5/5 - 12s - loss: 2.5556 - accuracy: 0.3837
Epoch 2/2
5/5 - 13s - loss: 2.1758 - accuracy: 0.3576


<tensorflow.python.keras.callbacks.History at 0x152ba6130>

In [43]:
for l in unet_model.layers:
    print(l)

<tensorflow.python.keras.engine.input_layer.InputLayer object at 0x152ba61c0>
<tensorflow.python.keras.engine.functional.Functional object at 0x153182a30>
<tensorflow.python.keras.engine.functional.Functional object at 0x153229880>
<tensorflow.python.keras.layers.convolutional.Conv2D object at 0x153229a60>
