In [1]:
import tensorflow as tf

2024-05-09 17:46:38.386883: I tensorflow/core/util/port.cc:113] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-05-09 17:46:38.406939: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-05-09 17:46:38.406959: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-05-09 17:46:38.407503: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-05-09 17:46:38.411365: I tensorflow/core/platform/cpu_feature_guar

In [2]:
class ConvBlock2D(tf.keras.layers.Layer):
    """
    Description:
    This class implemenents the basic convolution block of the architecture.
    It consists of an input 'Conv2D[1x1]-BatchNorm-ReLU' followed by a number
    of hidden 'Conv2D[nxn]-BatchNorm-ReLU' blocks. Residual connections are added 
    between the input and output of every hidden block. 

    - The (1x1) convolution-layer can be used for dimensionality reduction. 
      This idea was popularized by the authors of the InceptionNet architecture
    
    - Residual connections are used to promote better gradient flow during training 
      This idea was popularized by the authors of the ResNET50 architecture
    """

    def __init__(
        self,
        num_of_filters: int,
        num_of_blocks: int = 3,
        kernel_size: tuple = (3,3),
        leaky_relu_slope: float = 0.10 ):

        """
        Description:
        Class Constructor.

        Arguments:
         - num_of_filters: (int) number of convolution kernels per convolution layer.
         - num_of_blocks: (int) number of Conv2D-BatchNorm-ReLU blocks. The default value is 3: 1 input block and 2 hidden blocks
         - kernel_size: (tuple of ints) kernel dimensions for hidden convolution blocks in pixels (height x width). The default is (3,3)
         - leaky_relu_slope: (float) ReLU slope for negative inputs. The default is 0.15 (A negative sign is implicitely assumed)
        """

        assert(num_of_filters > 0)
        assert(num_of_blocks > 1)
        assert(len(kernel_size) == 2)
        assert(kernel_size[0] > 0 and kernel_size[1] > 0)
        
        self._layer_name = "[Conv2D[1x1]*{input_depth}-BatchNorm-ReLU]->[Conv2D[{height}x{width}]*{hidden_depth}-BatchNorm-ReLU]*{blocks}".format(
            input_depth = num_of_filters,
            height = kernel_size[0], 
            width = kernel_size[1], 
            hidden_depth = num_of_filters, 
            blocks = num_of_blocks - 1
        )
        
        # super().__init__( name = self._layer_name )
        super().__init__()

        # Total number of Conv2D-BN-ReLU blocks
        self._num_of_blocks = num_of_blocks

        # Lists for storing Conv2D, BatchNorm, ReLU and Residual layer instances
        self._conv_layers = []
        self._batch_norm_layers = [ tf.keras.layers.BatchNormalization() for i in range(self._num_of_blocks) ]
        self._relu_layers = [ tf.keras.layers.LeakyReLU( alpha = abs(leaky_relu_slope)) for i in range(self._num_of_blocks) ]
        self._residual_layers = [ tf.keras.layers.Add() if i > 0 else None for i in range(self._num_of_blocks) ]

        # Optional Dropout layer (it is meant to be used after the last Conv2D-BN-ReLU block)
        self._dropout_rate = 0.05
        self._dropout_layer = tf.keras.layers.Dropout(rate = self._dropout_rate)
        
        for i in range(self._num_of_blocks):
            self._conv_layers.append(
                tf.keras.layers.SeparableConv2D(
                    filters = num_of_filters,
                    kernel_size = kernel_size if i > 0 else (1,1),
                    padding = "same"
                )
            )

    def call(self, inputs, training = False):
        """
        
        """

        # Placeholder variable for the input tensor of the current Conv2D-BN-ReLU block
        previous_tensor = inputs

        # Placeholder variable for the output tensor of the current Conv2D-BN-ReLU block
        current_tensor = None

        # Define the computation graph of this custom-layer
        for i in range(self._num_of_blocks):

            # Conv2D -> BatchNorm -> ReLU
            current_tensor = self._conv_layers[i](previous_tensor)
            current_tensor = self._batch_norm_layers[i](current_tensor, training = training)
            current_tensor = self._relu_layers[i](current_tensor)

            # Apply the Residual connection (the first Conv2D-BN-ReLU block does not use one)
            if i > 0:
                current_tensor = self._residual_layers[i]([current_tensor, previous_tensor])

            # Save the output to use it as an input for the next iteration
            previous_tensor = current_tensor

        # Uncomment the following line to enable the optional Dropout layer
        # return self._dropout_layer(current_tensor, training=training)
        
        # Return the final output tensor
        return current_tensor

In [3]:
class TemporalUNET(tf.keras.models.Model):

    """
    """

    def __init__(
        self,
        num_of_prediction_steps: int,
        num_of_levels: int, 
        num_of_filters: list, 
        conv_blocks_per_level: int = 3, 
        kernel_size: tuple = (3,3), 
        leaky_relu_slope: float = 0.10):

        """
        num_of_levels: 
        num_of_filters:
        conv_blocks_per_level:
        kernel_size: 
        leaky_relu_slope: 
        """

        super().__init__()
        
        self._num_of_levels = num_of_levels
        self._num_of_prediction_steps = num_of_prediction_steps
        
        # Time-Distributed,  2D-Convolution layers on the encoder path 
        self._encoder_conv_blocks_2D = [None] * self._num_of_levels

        # Time-Distributed, 2D-Convolution layers on the decoder path
        self._decoder_conv_blocks_2D = [None] * self._num_of_levels

        # 2D-Conv-LSTM layers between the encoder and decoder path 
        self._lstm_layers_2D = [None] * self._num_of_levels

        # Optional BatchNorm layers (they are meant to be used before the ConvLSTM2D layer)
        self._batch_norm_layers = [None] * self._num_of_levels
        
        # Residual connections (channel-wise addition) between Conv-LSTM input and output tensors
        self._residual_layers = [None] * self._num_of_levels

        # Skip connections (depth-wise concatenation) on the decoder path
        self._concatenate_layers = [None] * self._num_of_levels
        
        # Downsampling layers on the encoder path 
        self._pooling_layers = [None] * self._num_of_levels

        # Upsampling layers on the decoder path 
        self._upsampling_layers = [None] * self._num_of_levels

        # Build the encoder network / contracting path 
        for i in range(self._num_of_levels): 
            self._encoder_conv_blocks_2D[i] = ConvBlock2D(
                num_of_filters = num_of_filters[i],
                num_of_blocks = conv_blocks_per_level, 
                kernel_size = kernel_size, 
                leaky_relu_slope = leaky_relu_slope                
            )

            self._encoder_conv_blocks_2D[i] = tf.keras.layers.TimeDistributed(self._encoder_conv_blocks_2D[i])

            # The last resolution level on the encoder path does not require a Pooling layer 
            if i != self._num_of_levels - 1: 
                self._pooling_layers[i] = tf.keras.layers.MaxPooling2D(pool_size = (2,2), padding = "valid")
                self._pooling_layers[i] = tf.keras.layers.TimeDistributed(self._pooling_layers[i])

        # Add Conv2D-LSTM layers and residual connections between the corresponding encoder and decoder levels.
        for i in range(self._num_of_levels):
            self._lstm_layers_2D[i] = tf.keras.layers.ConvLSTM2D(
                filters = num_of_filters[i],
                kernel_size = kernel_size, 
                padding = "same",
                return_sequences = True, 
                return_state = False, 
                dropout = 0.0,
                recurrent_dropout = 0.0,
            )

            self._batch_norm_layers[i] = tf.keras.layers.BatchNormalization()
            self._residual_layers[i] = tf.keras.layers.Add()

        # Build the decoder network / expanding path
        for i in range(self._num_of_levels): 
            self._decoder_conv_blocks_2D[i] = ConvBlock2D(
                num_of_filters = num_of_filters[i], 
                num_of_blocks = conv_blocks_per_level, 
                kernel_size = kernel_size, 
                leaky_relu_slope = leaky_relu_slope
            )

            self._decoder_conv_blocks_2D[i] = tf.keras.layers.TimeDistributed(self._decoder_conv_blocks_2D[i])

            # The first resolution level on the decoder path does not require an Upsampling layer
            if i != 0: 
                self._upsampling_layers[i] = tf.keras.layers.UpSampling2D(size = (2,2), interpolation = "bilinear")
                self._upsampling_layers[i] = tf.keras.layers.TimeDistributed(self._upsampling_layers[i])

            # The last resolution level does not require a channel-wise, concatenation layer
            # TODO: Add concatenation axis (channel dimension)
            if i != self._num_of_levels - 1: 
                self._concatenate_layers[i] = tf.keras.layers.Concatenate(axis = -1)

        self._output_layer = tf.keras.layers.ConvLSTM2D(
            filters = 2, 
            kernel_size = kernel_size, 
            padding = "same", 
            activation = "sigmoid", 
            return_sequences = False,
            return_state = False,
        )
    
    def call(self, inputs, training = False):
        """
        """

        temp = inputs
        
        encoder_outputs = [None] * self._num_of_levels
        lstm_outputs = [None] * self._num_of_levels
        decoder_outputs = [None] * self._num_of_levels

        # Define the computation graph of the contracting path / encoder network
        for i in range(self._num_of_levels): 
            print("encoder path: {}".format(i))
            encoder_outputs[i] = self._encoder_conv_blocks_2D[i](temp, training = training)
            print(tf.keras.backend.int_shape(encoder_outputs[i]))
            
            if i != self._num_of_levels - 1:
                temp = self._pooling_layers[i](encoder_outputs[i])

        print("")
        
        # Define the computation graph of the ConvLSTM layers between the encoder and decoder
        for i in range(self._num_of_levels): 
            print("LSTM path: {}".format(i))

            # Uncomment to enable BatchNormalization layers before ConvLSTM layers
            # lstm_outputs[i] = self._batch_norm_layers[i](encoder_outputs[i], training = trainin)
            # lstm_outputs[i] = self._lstm_layers_2D[i](lstm_outputs[i], training = training)
            # lstm_outputs[i] = self._residual_layers[i]([encoder_outputs[i], lstm_outputs[i]])
            
            lstm_outputs[i] = self._lstm_layers_2D[i](encoder_outputs[i], training = training)
            lstm_outputs[i] = self._residual_layers[i]([encoder_outputs[i], lstm_outputs[i]])
            print(tf.keras.backend.int_shape(lstm_outputs[i]))

        print("")
        
        # Define the computation graph of the expanding path / decoder network
        for i in range(self._num_of_levels - 1, -1, -1):
            print("Decoder path: {}".format(i))
            temp = lstm_outputs[i]
            
            # Concatenation
            if i != self._num_of_levels - 1:
                temp = self._concatenate_layers[i]([lstm_outputs[i], decoder_outputs[i+1]])

            print("concat shape: " + str(tf.keras.backend.int_shape(temp)))
            
            # Convolution 
            decoder_outputs[i] = self._decoder_conv_blocks_2D[i](temp, training = training)

            print("conv shape: " + str(tf.keras.backend.int_shape(decoder_outputs[i])))

            # Upsampling 
            if i != 0:
                decoder_outputs[i] = self._upsampling_layers[i](decoder_outputs[i])

            print("upsampling size: " + str(tf.keras.backend.int_shape(decoder_outputs[i])))

        output_tensor = self._output_layer(decoder_outputs[0], training = training)

        print("")
        print("tensor output: " + str(tf.keras.backend.int_shape(output_tensor)))
        
        return output_tensor

In [9]:
model = TemporalUNET(num_of_prediction_steps=1, num_of_levels = 5, num_of_filters = [8, 12, 16, 24, 32]) 
model.build(input_shape=(None, 10, 512, 512, 2))
model.summary()

encoder path: 0
(None, 10, 512, 512, 8)
encoder path: 1
(None, 10, 256, 256, 12)
encoder path: 2
(None, 10, 128, 128, 16)
encoder path: 3
(None, 10, 64, 64, 24)
encoder path: 4
(None, 10, 32, 32, 32)

LSTM path: 0
(None, 10, 512, 512, 8)
LSTM path: 1
(None, 10, 256, 256, 12)
LSTM path: 2
(None, 10, 128, 128, 16)
LSTM path: 3
(None, 10, 64, 64, 24)
LSTM path: 4
(None, 10, 32, 32, 32)

Decoder path: 4
concat shape: (None, 10, 32, 32, 32)
conv shape: (None, 10, 32, 32, 32)
upsampling size: (None, 10, 64, 64, 32)
Decoder path: 3
concat shape: (None, 10, 64, 64, 56)
conv shape: (None, 10, 64, 64, 24)
upsampling size: (None, 10, 128, 128, 24)
Decoder path: 2
concat shape: (None, 10, 128, 128, 40)
conv shape: (None, 10, 128, 128, 16)
upsampling size: (None, 10, 256, 256, 16)
Decoder path: 1
concat shape: (None, 10, 256, 256, 28)
conv shape: (None, 10, 256, 256, 12)
upsampling size: (None, 10, 512, 512, 12)
Decoder path: 0
concat shape: (None, 10, 512, 512, 20)
conv shape: (None, 10, 512, 512,

In [286]:
class EncoderDEM(tf.keras.models.Model):

    """
    This class implements a Convolutional Encoder architecture which is meant to be 
    used as a feature extractor for Digital Elevation Maps. Feature maps are extracted 
    sequentially starting from high spatial resolution and proceeding to lower
    spatial resolutions with Downsampling/Pooling layers.
    """

    def __init__(
        self, 
        num_of_levels: int,                 # Total number of resolution levels. Every level feeds its output to the next level through a pooling layer
        num_of_filters: list,               # Number of kernels per convolution layer. See class ConvBlock2D for more
        conv_blocks_per_level: int = 3,     # Number of convolution layers per resolution level. See class ConvBlock2D for more
        kernel_size: tuple = (3,3),         # Kernel dimensions (height x width). See class ConvBlock2D for more
        leaky_relu_slope: float = 0.10):    # (float) ReLU slope for negative input/arguments. See class ConvBlock2D for more
        
        """
        Description: 
        Class Constructor

        Arguments: 
         - levels: (int)
         - conv_blocks_per_level: (int)
         - kernel_size: (tuple)
         - leaky_relu_slope: (float)
        """

        # Store total number of resolution levels as a seperate class attribute 
        self._num_of_levels = num_of_levels

        # An empty list to hold all ConvBlock2D instances
        self._conv_blocks_2D = [None] * self._num_of_levels

        # An empty list to hold all MaxPooling2D layer instances
        self._pooling_layers = [None] * self._num_of_levels

        # Fill in the empty lists with keras.Layer instances
        for i in range(self._num_of_levels): 
            self._conv_blocks_2D[i] = ConvBlock2D(
                num_of_filters = num_of_filters[i],
                num_of_blocks = conv_blocks_per_layer, 
                kernel_size = kernel_size, 
                leaky_relu_slope = leaky_relu_slope
            )

            # The last resolution level does not perform a pooling operation on its output
            if i != self._num_of_blocks - 1: 
                self._pooling_layer[i] = tf.keras.layer.MaxPooling2D(pool_size=(2,2), padding="valid")

    def call(self, inputs, training=False):
        
        """
        """

        # An empty list to hold the feature maps 
        outputs = [None] * self._num_of_levels

        # Placeholder variable for storing the output of the intermediate max-pooling layers
        temp = inputs
        
        for i in range(self._num_of_levels): 
            outputs[i] = self._conv_blocks_2D[i](temp, training)

            # The last conv-block does not feed its output to a pooling layer
            if i != self._num_of_levels - 1:
                temp = self._pooling_layers[i](outputs[i])
                                                 
        return outputs