In [2]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import backend as K
# from keras.backend.common import _EPSILON
# from tensorflow.keras.backend import common 

# Misc
import numpy as np
import os
from datetime import datetime as dt
import jsonref
import json

# Save
import pickle

from sklearn.model_selection import train_test_split

# For custom weighted loss
# from tensorflow.keras.backend import _to_tensor

In [3]:
print(keras.__version__)
print(tf.__version__)

2.6.0
2.6.0


In [7]:
train_full_img_dir = "/datasets/mamografia/CBIS-DDSM_organized/images/preprocessed/train/full"
train_mask_img_dir = "/datasets/mamografia/CBIS-DDSM_organized/images/preprocessed/train/merged_masks"
test_full_img_dir = "/datasets/mamografia/CBIS-DDSM_organized/images/preprocessed/test/full"
test_mask_img_dir = "/datasets/mamografia/CBIS-DDSM_organized/images/preprocessed/test/merged_masks"
extension = ".png"
valtest_split = 0.5

train_full_img_paths = []
train_mask_img_paths = []
test_full_img_paths = []
test_mask_img_paths = []

# Get paths of train images and masks.
for full in os.listdir(train_full_img_dir):
    if full.endswith(extension):
        train_full_img_paths.append(os.path.join(train_full_img_dir, full))

for mask in os.listdir(test_full_img_dir):
    if mask.endswith(extension):
        train_mask_img_paths.append(os.path.join(train_mask_img_dir, mask))

# Get paths of test images and masks.
for full in os.listdir(test_full_img_dir):
    if full.endswith(extension):
        test_full_img_paths.append(os.path.join(test_full_img_dir, full))

for mask in os.listdir(test_mask_img_dir):
    if mask.endswith(extension):
        test_mask_img_paths.append(os.path.join(test_mask_img_dir, mask))

# Sort so that FULL and MASK images are in an order that corresponds
# with each other.
train_full_img_paths.sort()
train_mask_img_paths.sort()
test_full_img_paths.sort()
test_mask_img_paths.sort()

# Split test paths into validation and test sets.
valid_x, test_x = train_test_split(test_full_img_paths, test_size=valtest_split, random_state=42)
valid_y, test_y = train_test_split(test_mask_img_paths, test_size=valtest_split, random_state=42)

In [9]:
def build_encoder():
        
        # =========
        #  Encoder
        # =========
        
        # Get base model
        VGG16_ = keras.applications.VGG16(include_top=False,
                                          weights="imagenet",
                                          input_shape=self.encoder_input_shape)
        
        # Get list of layer names for skip connections later
        layer_names = [layer.name for layer in VGG16_.layers]

        # Get layer outputs
        all_layer_outputs = [VGG16_.get_layer(layer_name).output for layer_name in layer_names]
        
        # Create encoder model
        encoder_model = keras.Model(inputs=VGG16_.input, outputs=all_layer_outputs)
        
        # Freeze layers
        encoder_model.trainable = False
        
        return encoder_model


In [10]:
m = build_encoder

In [11]:
class UnetVGG16:
    
    def __init__(self, encoder_input_shape, learning_rate, batch_size, kernel_size, decoder_strides, decoder_padding, decoder_activation):
        
        self.encoder_input_shape = encoder_input_shape
        self.learning_rate = learning_rate
        self.batch_size = batch_size
        self.kernel_size = kernel_size
        self.decoder_strides = decoder_strides
        self.decoder_padding = decoder_padding
        self.decoder_activation = decoder_activation
        
        
    def build_encoder(self):
        
        # =========
        #  Encoder
        # =========
        
        # Get base model
        VGG16_ = keras.applications.VGG16(include_top=False,
                                          weights="imagenet",
                                          input_shape=self.encoder_input_shape)
        
        # Get list of layer names for skip connections later
        layer_names = [layer.name for layer in VGG16_.layers]

        # Get layer outputs
        all_layer_outputs = [VGG16_.get_layer(layer_name).output for layer_name in layer_names]
        
        # Create encoder model
        encoder_model = keras.Model(inputs=VGG16_.input, outputs=all_layer_outputs)
        
        # Freeze layers
        encoder_model.trainable = False
        
        return encoder_model
        
    
    def build_unet(self):
        
        # =============
        #  Input layer
        # =============
        
        unet_input = keras.Input(shape=self.encoder_input_shape, name="unet_input_layer")
        
        
        # =========
        #  Encoder
        # =========
        
        encoder_model = self.build_encoder()
        all_encoder_layer_outputs = encoder_model(unet_input)
        
        # Get final encoder output (this will be the input for the decoder)
        encoded_img = all_encoder_layer_outputs[-1]
        
        # Get outputs to be used for skip connections
        # (I know the specific layers to be used for skip connections)
        skip_outputs = [all_encoder_layer_outputs[i] for i in [2, 5, 9, 13, 17]]
        
        
        # =========
        #  Decoder
        # =========
        
        decoder_filters = encoded_img.shape[-1]
        
#         # Input layer into decoder
#         decoder_input = keras.Input(shape=encoded_img.shape[1:], name="encoded_img")
        
        # ------------------------------------------
        # Block 5 - `encoded_img` as initial input for decoder
        x = keras.layers.Conv2DTranspose(name="block5_up_convT",
                                          filters=decoder_filters,
                                          kernel_size=self.kernel_size,
                                          strides=self.decoder_strides,
                                          padding=self.decoder_padding,
                                          activation=self.decoder_activation)(encoded_img)
        
        x = keras.layers.Concatenate(name="block5_up_concat", axis=-1)([x, skip_outputs[4]])
        
        x = keras.layers.Conv2D(name="block5_up_conv3",
                                filters=decoder_filters,
                                kernel_size=self.kernel_size,
                                strides=(1, 1),
                                padding="same",
                                activation="relu")(x)
        x = keras.layers.Conv2D(name="block5_up_conv2",
                                filters=decoder_filters,
                                kernel_size=self.kernel_size,
                                strides=(1, 1),
                                padding="same",
                                activation="relu")(x)
        x = keras.layers.Conv2D(name="block5_up_conv1",
                                filters=decoder_filters,
                                kernel_size=self.kernel_size,
                                strides=(1, 1),
                                padding="same",
                                activation="relu")(x)
        
        
        # ------------------------------------------
        # Block 4
        x = keras.layers.Conv2DTranspose(name="block4_up_convT",
                                          filters=decoder_filters,
                                          kernel_size=self.kernel_size,
                                          strides=self.decoder_strides,
                                          padding=self.decoder_padding,
                                          activation=self.decoder_activation)(x)
        
        x = keras.layers.Concatenate(name="block4_up_concat", axis=-1)([x, skip_outputs[3]])
        
        x = keras.layers.Conv2D(name="block4_up_conv3",
                                filters=decoder_filters,
                                kernel_size=self.kernel_size,
                                strides=(1, 1),
                                padding="same",
                                activation="relu")(x)
        x = keras.layers.Conv2D(name="block4_up_conv2",
                                filters=decoder_filters,
                                kernel_size=self.kernel_size,
                                strides=(1, 1),
                                padding="same",
                                activation="relu")(x)
        x = keras.layers.Conv2D(name="block4_up_conv1",
                                filters=decoder_filters,
                                kernel_size=self.kernel_size,
                                strides=(1, 1),
                                padding="same",
                                activation="relu")(x)
        
        
        # ------------------------------------------
        # Block 3
        x = keras.layers.Conv2DTranspose(name="block3_up_convT",
                                          filters=decoder_filters / 2,
                                          kernel_size=self.kernel_size,
                                          strides=self.decoder_strides,
                                          padding=self.decoder_padding,
                                          activation=self.decoder_activation)(x)
        
        x = keras.layers.Concatenate(name="block3_up_concat", axis=-1)([x, skip_outputs[2]])
        
        x = keras.layers.Conv2D(name="block3_up_conv3",
                                filters=decoder_filters,
                                kernel_size=self.kernel_size,
                                strides=(1, 1),
                                padding="same",
                                activation="relu")(x)
        x = keras.layers.Conv2D(name="block3_up_conv2",
                                filters=decoder_filters,
                                kernel_size=self.kernel_size,
                                strides=(1, 1),
                                padding="same",
                                activation="relu")(x)
        x = keras.layers.Conv2D(name="block3_up_conv1",
                                filters=decoder_filters,
                                kernel_size=self.kernel_size,
                                strides=(1, 1),
                                padding="same",
                                activation="relu")(x)
        
        
        # ------------------------------------------
        # Block 2
        x = keras.layers.Conv2DTranspose(name="block2_up_convT",
                                          filters=decoder_filters / 4,
                                          kernel_size=self.kernel_size,
                                          strides=self.decoder_strides,
                                          padding=self.decoder_padding,
                                          activation=self.decoder_activation)(x)
        
        x = keras.layers.Concatenate(name="block2_up_concat", axis=-1)([x, skip_outputs[1]])
        
        x = keras.layers.Conv2D(name="block2_up_conv2",
                                filters=decoder_filters,
                                kernel_size=self.kernel_size,
                                strides=(1, 1),
                                padding="same",
                                activation="relu")(x)
        x = keras.layers.Conv2D(name="block2_up_conv1",
                                filters=decoder_filters,
                                kernel_size=self.kernel_size,
                                strides=(1, 1),
                                padding="same",
                                activation="relu")(x)
        
        
        # ------------------------------------------
        # Block 1
        x = keras.layers.Conv2DTranspose(name="block1_up_convT",
                                          filters=decoder_filters / 4,
                                          kernel_size=self.kernel_size,
                                          strides=self.decoder_strides,
                                          padding=self.decoder_padding,
                                          activation=self.decoder_activation)(x)
        
        x = keras.layers.Concatenate(name="block1_up_concat", axis=-1)([x, skip_outputs[0]])
        
        x = keras.layers.Conv2D(name="block1_up_conv2",
                                filters=decoder_filters,
                                kernel_size=self.kernel_size,
                                strides=(1, 1),
                                padding="same",
                                activation="relu")(x)
        x = keras.layers.Conv2D(name="block1_up_conv1",
                                filters=decoder_filters,
                                kernel_size=self.kernel_size,
                                strides=(1, 1),
                                padding="same",
                                activation="relu")(x)
        
        
        # ------------------------------------------
        # Final conv layer
        final_img = keras.layers.Conv2D(name="final_up_conv",
                                        filters=3,
                                        kernel_size=self.kernel_size,
                                        strides=(1, 1),
                                        padding="same",
                                        activation="relu")(x)
        
        # ======
        #  Unet
        # ======
        
        unet = keras.Model(inputs=unet_input, outputs=final_img, name="Unet_VGG16")
        
        return unet

In [12]:
unet = UnetVGG16(encoder_input_shape=(448, 448, 3),
                 learning_rate=0.001,
                 batch_size=1,
                 kernel_size=(3, 3),
                 decoder_strides=(2, 2),
                 decoder_padding="same",
                 decoder_activation=None)

In [13]:
unet_model = unet.build_unet()

In [14]:
unet_model.summary()

Model: "Unet_VGG16"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
unet_input_layer (InputLayer)   [(None, 448, 448, 3) 0                                            
__________________________________________________________________________________________________
model (Functional)              [(None, 448, 448, 3) 14714688    unet_input_layer[0][0]           
__________________________________________________________________________________________________
block5_up_convT (Conv2DTranspos (None, 28, 28, 512)  2359808     model[0][18]                     
__________________________________________________________________________________________________
block5_up_concat (Concatenate)  (None, 28, 28, 1024) 0           block5_up_convT[0][0]            
                                                                 model[0][17]            

In [15]:
keras.utils.plot_model(unet_model, to_file="./unet_model_v1.png", show_shapes=True, show_layer_names=True, dpi=300)

('You must install pydot (`pip install pydot`) and install graphviz (see instructions at https://graphviz.gitlab.io/download/) ', 'for plot_model/model_to_dot to work.')


In [16]:
unet_model.compile(optimizer=keras.optimizers.Adam(),
                   loss="binary_crossentropy",
                   metrics=["accuracy"])

In [17]:
unet_model.fit(GenerateInputs(X=x_train, y=y_train), batch_size=1, steps_per_epoch = 5, epochs=2, verbose=2)

NameError: name 'GenerateInputs' is not defined