# Advanced Customization

Here we will build a one-dimensional convolutional neural network. We will customize the model, layer and kernel class of Tensorflow/Keras.

A layer with Gaussian kernel method is going to be implemented. Actually, kernels are mostly random normally distributed,
but we want a kernel function that is smoother and vanishing on the left and right hand side, but still random in some sense.


In [None]:
# Let's import the needed packages
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import datasets, layers, models
import matplotlib.pyplot as plt
import numpy as np

from utils.utils import (
    calc_gaussians,
    gen_gaussian_kernel_v1_1D,
    load_data_cnn,
    plot_gaussian_weights,
    plot_derivative_energy
)

## Data

A one-dimensional model system of noninteracting spinless fermions with one particle in a hard wall box and a 
potential described by a linear combination of three Gaussians has been investigated. 
With the Numerov’s method the 1D Schrödinger equation for these potentials is solved
on a grid of $G = 500$ points. The solutions are then used to compute the data for the training.

The input data for the neural network is going to be the density with the dimension (500, 1) and the target data is going to be the kinetic energy with the dimension (1) and the kintetics energy derivative with the dimension (500). All of the data have a length of 100. <br>
The difference between the dimension of density and the kintetics energy derivative, is that the density list of lists ( [ [value], [value]. ...] ) and kintetics energy derivative is a list with a lenght of 500.

In [None]:
data_path = 'data/orbital_free_DFT/'

kinetic_train, kinetic_derivativ_train, density_train = load_data_cnn(
    path=data_path,
    data_name='M=100_training_data.hdf5'
)

kinetic_test, kinetic_derivativ_test, density_test = load_data_cnn(
    path=data_path,
    data_name='test_data.hdf5'
)
print(f'Dimenstion of density data {np.shape(density_train)}')
print(f'Dimenstion of kinetic derivative data {np.shape(kinetic_derivativ_train)}')
print(f'Dimenstion of kinetic energy data {np.shape(kinetic_train)}')

## Custom Kernel

The Kernel class describes/returns how the kernel values are set.
We are overwirtting the __init__ and the call method.

In [None]:
# Let's import the packages which are needed for the class.
from tensorflow.python.ops.init_ops_v2 import Initializer
from tensorflow.python.framework import dtypes
from tensorflow.python.ops.init_ops_v2 import _assert_float_dtype


class ContinuousConvKernel1DV1(Initializer):
    '''
     weights_init: 2 dimensianl array. First entry is mean, secound is std
     create_continuous_kernel: method that creates the values for a continuous kernel
     random_init: Bool if the initialization is random 
     seed: integer if radom values should be the same.
    '''
    def __init__(self,
                 weights_init,
                 create_continuous_kernel=None,
                 random_init=False,
                 seed=None):
        
        if not create_continuous_kernel:
            raise ValueError("Set a continuous kernel")

        # We overwritte the init method. We are checkking if weights_init
        # consists of two variables (mean and std)
        if len(weights_init) != 2:
            raise ValueError("weights_init length must be 2")
        # Checking if mean is greater zero
        if weights_init[0] < 0:
            raise ValueError("'mean' must be positive float")
        # Checking if std is greater zero
        if weights_init[1] < 0:
            raise ValueError("'stddev' must be positive float")

        self.weights_init = weights_init
        self.random_init = random_init
        self.create_continuous_kernel = create_continuous_kernel


    def __call__(self, shape, dtype=dtypes.float32):
        # We are overwritting the call method and setting the gaussian kernel
        """Returns a tensor object initialized as specified by the initializer.
        Args:
          shape: Shape of the tensor.
          dtype: Optional dtype of the tensor. Only floating point types are
              supported.
        Raises:
          ValueError: If the dtype is not floating point
        """

        dtype = _assert_float_dtype(dtype)

        continuous_kernel = self.create_continuous_kernel(
            shape=shape,
            weights=self.weights_init,
            dtype=dtype,
            random_init=self.random_init)
        # The custom kernel is returned
        return continuous_kernel

    def get_config(self):
        return {
            "mean": self.weights_init[0],
            "stddev": self.weights_init[1],
            "raondom_init": self.random_init
        }

## Custom Layers

We need two custom layers. One Layer with the custom kernel initilaizer and one layer that integrates its input values.

In [None]:
class ContinuousConv1D(keras.layers.Conv1D):
    '''
    Here, we just overwritte of the kernel_initializer in the _init_ method.
    The method for creating the custom kernel is passed to the __init__ as a parameter 
    and can be found in utils/utils.py
    '''
    def __init__(self,
                 weights_init,
                 create_continuous_kernel,
                 random_init=False,
                 seed=None,
                 **kwargs):
        super().__init__(**kwargs)

        # Set custom kernel init. with gaussian kernel
        self.kernel_initializer = ContinuousConvKernel1DV1(
            weights_init=weights_init,
            create_continuous_kernel=create_continuous_kernel,
            random_init=random_init,
            seed=seed
        )

class IntegrateLayer(tf.keras.layers.Layer):
    '''
    In the call method we implement the trapezoidal integral and in the init we pass h for inte
    '''
    def __init__(self, h=1.0, **kwargs):
        super().__init__(**kwargs)
        self.h = h

    def call(self, inputs):
        return self.h * tf.reduce_sum(
            (inputs[:, :-1] + inputs[:, 1:]) / 2.,
            axis=1, name='trapezoidal_integral_approx')

    def get_config(self):
        config = super().get_config()
        config.update({'h': self.h})
        return config

In [None]:
class CustomCNNV1Model(keras.Model):
    '''
    filter_size: integer. How many filters should be created in a layer.
    kernel_sice: integer. How many kernel functions are in one filter
    layer_length: integer. Layer length
    create_continuous_kernel: method that creates the values for a continuous kernel
    kernel_regularizer: float. kernel regulizer
    dx: float. integration step
    weights: 2 dimensianl array. First entry is mean, secound is std
    '''
    def __init__(
            self,
            filter_size,
            kernel_size,
            layer_length,
            create_continuous_kernel,
            kernel_regularizer,
            activation,
            dx=0.002,
            weights=[5, 5]):
        super().__init__()
        self.dx = dx
        self.conv_layers = []
        mean = weights[0]
        stddev = weights[1]
        
        # Here we create continuous kernels in a foor loop
        for l in range(layer_length):
            cont_layer = ContinuousConv1D(
                filters=filter_size,
                kernel_size=kernel_size,
                padding='same',
                weights_init=[mean, stddev],
                create_continuous_kernel=create_continuous_kernel,
                kernel_regularizer=kernel_regularizer,
                activation=activation,
                random_init=True,
            )
            self.conv_layers.append(cont_layer)
        
        # last layer is fixed to use a single filter
        cont_layer = ContinuousConv1D(
            filters=1,
            kernel_size=kernel_size,
            padding='same',
            weights_init=[mean, stddev],
            create_continuous_kernel=create_continuous_kernel,
            kernel_regularizer=kernel_regularizer,
            activation=activation,
            random_init=True
        )
        self.conv_layers.append(cont_layer)
        self.integrate = IntegrateLayer(dx)
        
    @tf.function
    def call(self, inputs):
        with tf.GradientTape() as tape:
            tape.watch(inputs)
            # Calculate kinetic energy density tau by applying convolutional layers
            tau = inputs
            for layer in self.conv_layers:
                tau = layer(tau)
            # Kinetic energy T is integral over kinetiv energy density
            T = self.integrate(tau)
        # The discretized derivative needs to be divided by dx
        dT_dn = tape.gradient(T, inputs)/self.dx
        return {'T': T, 'dT_dn': dT_dn}

In [None]:
# Put the training data in a tensor slice
# Advantage: Batch size and how often it
# should be reapeted can defined here.
training_dataset = tf.data.Dataset.from_tensor_slices(
    (
        density_train.astype(np.float32),
        {'T': kinetic_train.astype(np.float32),
        'dT_dn': kinetic_derivativ_train.astype(np.float32)}
    )
).batch(100).repeat(10)

In [None]:
kernel_size = 100
layer_length = 3
activation = 'softplus'
epoch = 10

# Now, we initialize/build/compile the model
model = CustomCNNV1Model(
    filter_size=32,
    kernel_size=kernel_size,
    layer_length=layer_length,
    dx=0.002,
    create_continuous_kernel=gen_gaussian_kernel_v1_1D,
    kernel_regularizer=tf.keras.regularizers.l2(0.00025),
    activation=activation
)


model.build(input_shape=(None, 500, 1))
model.compile(
    optimizer=tf.keras.optimizers.Adam(
        learning_rate=0.0001, amsgrad=False
    ),
    loss={'T': 'mse', 'dT_dn': 'mse'},
    loss_weights={'T': 0.2, 'dT_dn': 1.0}, # scale the loss in T by 0.2
    metrics={'T': ['mae'], 'dT_dn': ['mae']}
)

weights_before_train = model.layers[0].get_weights()[0]

# Let's print the summary of our model
model.summary()

In [None]:
# tip: with tf.device('/device:GPU:0'): you can run/train the machine learning model on a specific GPU
'''
with tf.device('/device:GPU:0'):
    model.fit(....
'''

model.fit(
    training_dataset,
    epochs=epoch,
    verbose=2, # With 0, there no print of loss. With 1, the print is with loss and a loading bar (batch/epoch), with 2 print with loss 
    validation_data=(
        density_test, {'T': kinetic_test, 'dT_dn': kinetic_derivativ_test}
    ),
    validation_freq=2
)

In [None]:
weights_after_train = model.layers[0].get_weights()[0]
# Plot of 32 kernels with a length of 100, before and after the training
plot_gaussian_weights(weights_before_train, 'before')
plot_gaussian_weights(weights_after_train, 'after')

In [None]:
x = np.linspace(0, 1, 500)
# Plotting the energy derivative. Reference and trained
plot_derivative_energy(x, kinetic_derivativ_train, model, density_train)