In [1]:
# markdown font settings
from IPython.display import display, HTML # type:ignore
style = '<style> body { font-family: Times New Roman; font-size: 16px; } </style>' 
display(HTML(style))

In [2]:
# importing tensorflow
import tensorflow as tf

In [3]:
#other imports 
from tensorflow import keras
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from typing import List

# <div style="font-family:fantasy;">Tensors and operations</div>

In [4]:
t = tf.constant([[1.0,2.0,3.0],[4.0,5.0,6.0]]) #a matrix can often be said as a vector 
t

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)>

In [5]:
tf.constant(42) # a scalar

<tf.Tensor: shape=(), dtype=int32, numpy=42>

A tensor is similar to numpy's ndarray. the above declared one is also a tensor. It does have the properties similar to numpy's ndarray. 

In [6]:
t.shape

TensorShape([2, 3])

In [7]:
t.dtype

tf.float32

indexing as well works like ndarray

In [8]:
t[:,:2]

<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[1., 2.],
       [4., 5.]], dtype=float32)>

other operations are available as well

In [9]:
t+10

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[11., 12., 13.],
       [14., 15., 16.]], dtype=float32)>

In [10]:
tf.square(t)

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[ 1.,  4.,  9.],
       [16., 25., 36.]], dtype=float32)>

below is code for $|t|^2=t.t^T$

In [11]:
t @ tf.transpose(t)

<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[14., 32.],
       [32., 77.]], dtype=float32)>

the difference with numpy is being depicted here

In [12]:
arr = np.array([[1.0,2.0,3.0],[4.0,5.0,6.0]])
arr

array([[1., 2., 3.],
       [4., 5., 6.]])

In [13]:
arr.dot(arr.T)

array([[14., 32.],
       [32., 77.]])

### <div style="font-family:fantasy;">keras' low level API</div>
we can use `keras.backend` to create functions available in tensorflow. If  you  want  to  write  code  that  will  be
portable to other Keras implementations, you should use these Keras functions.


In [14]:
K = keras.backend
K.square(K.transpose(t))+10

<tf.Tensor: shape=(3, 2), dtype=float32, numpy=
array([[11., 26.],
       [14., 35.],
       [19., 46.]], dtype=float32)>

### <div style="font-family:fantasy;">Tensors and numpy</div>
np arrays and tensors go hand in hand and can be used interchangably.


In [15]:
a = np.array([2.,4.,5.])

In [16]:
a.dtype

dtype('float64')

In [17]:
tf.constant(a)

<tf.Tensor: shape=(3,), dtype=float64, numpy=array([2., 4., 5.])>

In [18]:
t.numpy()

array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)

In [19]:
tf.square(a)

<tf.Tensor: shape=(3,), dtype=float64, numpy=array([ 4., 16., 25.])>

In [20]:
np.square(t)

array([[ 1.,  4.,  9.],
       [16., 25., 36.]], dtype=float32)

In [21]:
np.transpose(t)

array([[1., 4.],
       [2., 5.],
       [3., 6.]], dtype=float32)

<div style="border:2px solid;border-radius:4px;width:50vw;">note that tensorflow uses 64 bit by default while numpy uses 32 bit</div>

### <div style="font-family:fantasy;">Type conversions</div>
Type conversions significantly hurt performance and can easily go unnoticed when they are done automatically. To avoid this TF does not perform them automatically and raises exceptions for oprations on tensors with incompatible types. For  example,  you  cannot  add  a  float tensor and an integer tensor, and you cannot even add a 32-bit float and a 64-bit float


In [22]:
tf.constant(2.)+tf.constant(3.)


<tf.Tensor: shape=(), dtype=float32, numpy=5.0>

In [23]:
# tf.constant(2.)+tf.constant(3)
#this will give error


In [24]:
tf.constant(2.)+tf.constant(3.,dtype=tf.float32)


<tf.Tensor: shape=(), dtype=float32, numpy=5.0>

In [25]:
# tf.constant(2.)+tf.constant(3.,dtype=tf.float64)
# this will give error


we can do manual type conversion using `tf.cast()`

In [26]:
t2 = tf.constant(40.,dtype=tf.float64)
tf.constant(2.)+tf.cast(t2,tf.float32)


<tf.Tensor: shape=(), dtype=float32, numpy=42.0>

`tf.Tensor` is immutable hence cannot be used for storing weights in a DNN as they will be changed during backpropagation. Also other parameters as well are needed to change hence we here use `tf.Variables()`

In [27]:
v = tf.Variable([[1., 2., 3.], [4., 5., 6.]]) # type: ignore
v

<tf.Variable 'Variable:0' shape=(2, 3) dtype=float32, numpy=
array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)>

we can modify it using `assign()` method (or `assign_sub()` and `assign_add()` method which is increment or decrement of the variable by a given value ) method. we can also update the values of a particular slice (or cell ) using `assign()` or by using `scatter_update()` or `scatter_nd_update()`

In [28]:
v.assign(2*v)


<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2.,  4.,  6.],
       [ 8., 10., 12.]], dtype=float32)>

In [29]:
v[0,1].assign(42)


<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2., 42.,  6.],
       [ 8., 10., 12.]], dtype=float32)>

In [30]:
v[:,2].assign([0.,1.])


<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2., 42.,  0.],
       [ 8., 10.,  1.]], dtype=float32)>

In [31]:
v.scatter_nd_update(indices=[[0,0],[1,2]],updates=[100.,200.])

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[100.,  42.,   0.],
       [  8.,  10., 200.]], dtype=float32)>

<div style="border:2px solid;border-radius:4px;width:50vw;">In  practice  you  will  rarely  have  to  create  variables  manually,  since
Keras provides an <code>add_weight()</code> method that will take care of it for
you,  as  we  will  see.  Moreover,  model  parameters  will  generally  be
updated  directly  by  the  optimizers,  so  you  will  rarely  need  to
update variables manually</div>

### <div style="font-family:fantasy;">Other data structures</div>
*Sparse tensors (`tf.SparseTensor`)*\
Efficiently  represent  tensors  containing  mostly  zeros.  The  tf.sparse  package
contains operations for sparse tensors.

*Tensor arrays (`tf.TensorArray`)*\
Are lists of tensors. They have a fixed size by default but can optionally be made
dynamic. All tensors they contain must have the same shape and data type.

*Ragged tensors (`tf.RaggedTensor`)*\
Represent  static  lists  of  lists  of  tensors,  where  every  tensor  has  the  same  shape
and data type. The tf.ragged package contains operations for ragged tensors.
*String tensors*\
Are regular tensors of type tf.string. These represent byte strings, not Unicode
strings,  so  if  you  create  a  string  tensor  using  a  Unicode  string  (e.g.,  a  regular
Python  3  string  like  "café"),  then  it  will  get  encoded  to  UTF-8  automatically
(e.g.,  b"caf\xc3\xa9").  Alternatively,  you  can  represent  Unicode  strings  using
tensors of type tf.int32, where each item represents a Unicode code point (e.g.,
[99, 97, 102, 233]). The tf.strings package (with an s) contains ops for byte
strings and Unicode strings (and to convert one into the other). It’s important to
note  that  a  tf.string  is  atomic,  meaning  that  its  length  does  not  appear  in  the
tensor’s  shape.  Once  you  convert  it  to  a  Unicode  tensor  (i.e.,  a  tensor  of  type
tf.int32 holding Unicode code points), the length appears in the shape.

*Sets*\
Are  represented  as  regular  tensors  (or  sparse  tensors).  For  example,  tf.con
stant([[1, 2], [3, 4]]) represents the two sets {1, 2} and {3, 4}. More gener‐
ally, each set is represented by a vector in the tensor’s last axis. You can
manipulate sets using operations from the tf.sets package.

*Queues*\
Store  tensors  across  multiple  steps.  TensorFlow  offers  various  kinds  of  queues:
simple First In, First Out (FIFO) queues (FIFOQueue), queues that can prioritize
Using TensorFlow like NumPy | 383
some  items  (PriorityQueue),  shuffle  their  items  (RandomShuffleQueue),  and
batch items of different shapes by padding (PaddingFIFOQueue). These classes are
all in the tf.queue package.


# <div style="font-family:fantasy;">customizing models and Training algorithms</div>


In [32]:
# preparing dataset
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(
    housing.data, housing.target.reshape(-1, 1), random_state=42) #type:ignore
X_train, X_valid, y_train, y_valid = train_test_split(
    X_train_full, y_train_full, random_state=42)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_valid_scaled = scaler.transform(X_valid)
X_test_scaled = scaler.transform(X_test)

### <div style="font-family:fantasy;">custom loss function</div>
huber loss is most commonly used for higher outliers. 
$$L(y, \hat{y}) = \begin{cases} 
\frac{1}{2}(y - \hat{y})^2, & \text{if } |y - \hat{y}| \leq \delta \\
\delta(|y - \hat{y}| - \frac{1}{2}\delta), & \text{otherwise}
\end{cases}
$$

As we can see it uses $\ell_2$ error if the difference btwn `y_pred` and `y_true` is less than $\delta$, else uses $\ell_1$ error.

This is available in `keras.losses.Huber`. but we can implement it as a custom loss function.

In [33]:
def huber_fn(y_true,y_pred):
    error = y_true-y_pred
    is_small_error = tf.abs(error)<1
    squared_loss = tf.square(error) / 2 # type: ignore
    linear_loss = tf.abs(error)-0.5
    return tf.where(is_small_error, squared_loss, linear_loss)



In [34]:
# building layers
input_shape = X_train.shape[1:]

model = keras.models.Sequential([
    keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal",
                       input_shape=input_shape),
    keras.layers.Dense(1),
])

In [35]:
# compiling mode
model.compile(loss=huber_fn, optimizer="nadam", metrics=["mae"])

In [36]:
# fitting data
model.fit(X_train_scaled, y_train, epochs=2,
          validation_data=(X_valid_scaled, y_valid))

Epoch 1/2

Epoch 2/2


<keras.callbacks.History at 0x22b2d6c8c10>

It  is  also  preferable  to  return  a  tensor  containing  one  loss  per  instance,  rather  than
returning  the  mean  loss.  This  way,  Keras  can  apply  class  weights  or  sample  weights
when requested 

### <div style="font-family:fantasy;">Saving and Loading Models that contain custom components</div>



In [37]:
#saving model
model.save("./models/model_chapter_12_custom_fun.h5")

Whenever you load it, you’ll need to provide a dictionary that maps
the  function  name  to  the  actual  function.  More  generally,  when  you  load  a  model
containing custom objects, you need to map the names to the objects

In [38]:
model = keras.models.load_model("./models/model_chapter_12_custom_fun.h5",custom_objects={"huber_fn":huber_fn})

Here we have assumed the error threshold to be 1. but if we want it small then we need to do the following 

In [39]:
def create_huber(threshold=1.0):
    def huber_fn(y_true,y_pred):
        error = y_true-y_pred
        is_small_error = tf.abs(error)<threshold
        squared_loss = tf.square(error) / 2 # type: ignore
        linear_loss = tf.abs(error)-0.5
        return tf.where(is_small_error, squared_loss, linear_loss)
    return huber_fn 




In [40]:
# building layers
input_shape = X_train.shape[1:]

model = keras.models.Sequential([
    keras.layers.Dense(30, activation="selu", 
                       kernel_initializer="lecun_normal",
                       input_shape=input_shape),
    keras.layers.Dense(1),
])

In [41]:
#compiling model
model.compile(loss=create_huber(2.0),optimizer='nadam')


In [42]:
# fitting data
model.fit(X_train_scaled, y_train, epochs=2,
          validation_data=(X_valid_scaled, y_valid))

Epoch 1/2


Epoch 2/2


<keras.callbacks.History at 0x22b2e981750>

In [43]:
# saving thus model 
model.save('./models/model_chapter_12_custom_fun_2.h5')

note that
the name to use is `"huber_fn"`, which is the name of the function you gave Keras, not
the name of the function that created it

In [44]:
# retriving model
model = keras.models.load_model("./models/model_chapter_12_custom_fun_2.h5",custom_objects={'huber_fn':create_huber(2.0)})

You  can  solve  this  by  creating  a  subclass  of  the  `keras.losses.Loss`  class,  and  then
implementing its `get_config()` method:

In [45]:
class HuberLoss(keras.losses.Loss):
    def __init__(self,threshold=1.0,**kwargs):
        self.threshold=threshold
        super().__init__(**kwargs)
    def call(self,y_true,y_pred):
        error = y_true-y_pred
        is_small_error = tf.abs(error)<self.threshold
        squared_loss = tf.square(error) / 2 # type: ignore
        linear_loss = tf.abs(error)-0.5
        return tf.where(is_small_error, squared_loss, linear_loss)
    def get_config(self):
        base_config=super().get_config()
        return {**base_config,'threshold':self.threshold}



* the constructor accpets `**kwargs` and passes it to the parent constructor that handles the hyperparameters.
    * the name of the algorithm
    * the reduction algorithm to be used:
        * `"sum_over_batch_size"`, which means the net loss will be the sum of the instance losses weighted by the sample weights(if any) and divided by the batch size rather than the sum of weights thus it is not weighted mean.Other possible values being `sum` and `none`.
* `call()` method takes the labels and predictions computes all the instances losses and returns them.
* `get_config()` returns a dictionary mapping of each hyperparameter name to it's value. It uses the super class dictionary and adds the new hyperparamters(or configurations) to it.

In [46]:
# building layers
input_shape = X_train.shape[1:]

model = keras.models.Sequential([
    keras.layers.Dense(30, activation="selu", 
                       kernel_initializer="lecun_normal",
                       input_shape=input_shape),
    keras.layers.Dense(1),
])

In [47]:
# compiling model
model.compile(loss=HuberLoss(2.0),optimizer='nadam')


In [48]:
# fitting model
model.fit(X_train_scaled, y_train, epochs=2,
          validation_data=(X_valid_scaled, y_valid))

Epoch 1/2


Epoch 2/2


<keras.callbacks.History at 0x22b2e884d90>

In [49]:
model.save("./models/model_chapter_12_custom_fun_3.h5")

In [50]:
# loading model
model = keras.models.load_model("./models/model_chapter_12_custom_fun_3.h5",custom_objects={'HuberLoss':HuberLoss})

When  you  save  a  model,  Keras  calls  the  loss  instance’s  `get_config()`  method  and
saves  the  config  as  JSON  in  the  HDF5  file.  When  you  load  the  model,  it  calls  the
<code class='language-python'>
from_config()
</code> 
class method on the HuberLoss class: this method is implemented by
the  base  class  (Loss)  and  creates  an  instance  of  the  class,  passing  `**config`  to  the
constructor.

### <div style="font-family:fantasy;">custom activations functions, initializer, regularizers, and constraints</div>



Here  are  examples  of  a  custom  activation
function  (equivalent  to  `keras.activations.softplus()`  or  `tf.nn.softplus()`),  a
custom  Glorot  initializer  (equivalent  to  `keras.initializers.glorot_normal()`),  a
custom  ℓ1  regularizer  (equivalent  to  `keras.regularizers.l1(0.01)`),  and  a  custom
constraint that ensures weights are all positive (equivalent to `keras.con
straints.nonneg()` or `tf.nn.relu()`):

In [51]:
def my_softplus(z):
    return tf.math.log(tf.exp(z)+1.0)
def my_glorot_initializer(shape,dtype=tf.float32):
    stddev = tf.sqrt(2.0/(shape[0]+shape[1]))
    return tf.random.normal(shape,stddev=stddev,dtype=dtype)
def my_l1_regularizer(weights):
    return tf.reduce_sum(tf.abs(0.01*weights))
def my_positive_weights(weights): # return value is same as tf.nn.relu(weights)
    return tf.where(weights<0.0,tf.zeros_like(weights),weights)


In [52]:
keras.backend.clear_session()

In [53]:
model = keras.models.Sequential([
    keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal",
                       input_shape=input_shape),
    keras.layers.Dense(1, activation=my_softplus,
                       kernel_regularizer=my_l1_regularizer,
                       kernel_constraint=my_positive_weights,
                       kernel_initializer=my_glorot_initializer), # type: ignore
])

In [54]:
model.compile(loss="mse", optimizer="nadam", metrics=["mae"])

In [55]:
model.fit(X_train_scaled, y_train, epochs=2,
          validation_data=(X_valid_scaled, y_valid))

Epoch 1/2


Epoch 2/2


<keras.callbacks.History at 0x22b30c33dd0>

In [56]:
# saving model
model.save("./models/chapter_12_custom_fun_4.h5")

In [57]:
# loading model
model = keras.models.load_model(
    "./models/chapter_12_custom_fun_4.h5",
    custom_objects={
       "my_l1_regularizer": my_l1_regularizer,
       "my_positive_weights": my_positive_weights,
       "my_glorot_initializer": my_glorot_initializer,
       "my_softplus": my_softplus,
    })

The activation function will be applied to the output of this Dense layer, and its result
will  be  passed  on  to  the  next  layer.  The  layer’s  weights  will  be  initialized  using  the
value returned by the initializer. At each training step the weights will be passed to the
regularization function to compute the regularization loss, which will be added to the
main loss to get the final loss used for training. Finally, the constraint function will be
called  after  each  training  step,  and  the  layer’s  weights  will  be  replaced  by  the  con‐
strained weights.

If  a  function  has  hyperparameters  that  need  to  be  saved  along  with  the  model,  then
you will want to subclass the appropriate class, such as keras.regularizers.Regular
izer, keras.constraints.Constraint, keras.initializers.Initializer, or
keras.layers.Layer (for any layer, including activation functions). Much like we did
for  the  custom  loss,  

In [58]:
#custom l1 initializer
class MyL1Regularizer(keras.regularizers.Regularizer):
    def __init__(self,factor):
        self.factor = factor
    def __call__(self,weights):
        return tf.reduce_sum(tf.abs(self.factor*weights))
    def get_config(self):
        return {"factor":self.factor}
    

### <div style="font-family:fantasy;">Custom Metrics</div>


metrics (e.g., accuracy)
are used to evaluate a model: they must be more easily interpretable, and they can be
non-differentiable or have 0 gradients everywhere.

For each batch keras will compute the metric and keep track of its mean since the begining of the epoch. 

But this mean method will not be working with the precision (reason below)

<div style="border:2px solid;border-radius:4px;width:50vw;">
Suppose the model
made five positive predictions in the first batch, four of which were correct: that’s 80%
precision.  Then  suppose  the  model  made  three  positive  predictions  in  the  second
batch, but they were all incorrect: that’s 0% precision for the second batch. If you just
compute the mean of these two precisions, you get 40%. But wait a second—that’s not
the  model’s  precision  over  these  two  batches!  Indeed,  there  were  a  total  of  four  true
positives  (4  +  0)  out  of  eight  positive  predictions  (5  +  3),  so  the  overall  precision  is
50%,  not  40%.
</div>

Thus we need a tracker (an object ofcourse ) that keeps track of the true positives and false positives and return the ratio when required rather than computing the mean at each step. The application via 
```python
keras.metrics.Precision
```

In [59]:
precision = keras.metrics.Precision()
precision([0, 1, 1, 1, 0, 1, 0, 1], [1, 1, 0, 1, 0, 1, 0, 1]) #([predicted],[actual])

<tf.Tensor: shape=(), dtype=float32, numpy=0.8>

In [60]:
precision([0, 1, 0, 0, 1, 0, 1, 1], [1, 0, 1, 1, 0, 0, 0, 0])

<tf.Tensor: shape=(), dtype=float32, numpy=0.5>

thus here it takes the cumulative data and not the mean. This is called a *streaming metric* (or *stateful metric*), as it is gradually updated, batch after batch

Also at any point we can we can call the `result()` method to get the current value of the metric. We can also look at its variables by using the `variables` attribute and we can reset them using `reset_states()`.

In [61]:
precision.result()

<tf.Tensor: shape=(), dtype=float32, numpy=0.5>

In [62]:
precision.variables

[<tf.Variable 'true_positives:0' shape=(1,) dtype=float32, numpy=array([4.], dtype=float32)>,
 <tf.Variable 'false_positives:0' shape=(1,) dtype=float32, numpy=array([4.], dtype=float32)>]

In [63]:
precision.reset_states()

In [64]:
precision.result()

<tf.Tensor: shape=(), dtype=float32, numpy=0.0>

Now we define such a custom defined _streaming metric_ it will return the huber loss when the result is asked for.

In [65]:
class HuberMetric(keras.metrics.Metric):
    def __init__(self,threshold=1.0,**kwargs):
        super().__init__(**kwargs)
        self.threshold=threshold
        self.huber_fn=create_huber(threshold)
        self.total = self.add_weight("total",initializer="zeros")
        self.count = self.add_weight("count",initializer="zeros")
    def update_state(self,y_true,y_pred,sample_weight=None):
        metric=self.huber_fn(y_true,y_pred)
        self.total.assign_add(tf.reduce_sum(metric))
        self.count.assign_add(tf.cast(tf.size(y_true),tf.float32))
    def result(self):
        return self.total/self.count
    def get_config(self):
        base_config=super().get_config()
        return {**base_config,"threshold":self.threshold}

in the above code:
* The constructor uses `tf.add_weight()` to create the variable to keep track of the true positives and the count (i.e. metric state) over multiple batches(here it the sum of the huber losses (`total`) and the number of instances seen so far(`count`)). Keras tracks any tf.Variable that is set as an attribute (and more generally, any “trackable” object, such as layers or models).

* `update_state` method this updates the metric state. Note that it adds the new number of the true positives (i.e.  labels) to the total and the true positives (or the labels) to the count at each epoch(or step).

* the `result` method is kept separate as we are intended to take the ratio as and when required. so at the point of results the `update_state()` finishes operation then the `result()` method is called and is returned.

* Then we have `get_config()` this was done earlier ofcourse!!!


So the only benefit of our HuberMetric class is that the threshold will be saved.
But  of  course,  some  metrics,  like  precision,  cannot  simply  be  averaged  over  batches:
in those cases, there’s no other option than to implement a streaming metric

### <div style="font-family:fantasy;">Custom Layers</div>

We may  simply  want  to  build  a  very  repetitive
architecture, containing identical blocks of layers repeated many times, and it would be convenient to treat each block of layers as a single layer. For example, if the model is a sequence of layers A, B, C, A, B, C, A, B, C, then you might want to define a custom layer D containing layers A, B, C, so your model would then simply be D, D, D.


1. layers ithout weights like `keras.layers.Flatten` or `keras.layers.ReLU`. If  you  want  to  create  a  custom  layer  without  any  weights,  the  simplest option is to write a function and wrap it in a keras.layers.Lambda layer

In [66]:
exponential_layer = keras.layers.Lambda(lambda x: tf.exp(x))

Now this layer can be used like any other layer. Also it can be used as an activation function layer. The  exponential  layer  is  sometimes
used  in  the  output  layer  of  a  regression  model  when  the  values  to  predict  have  very different scales (e.g., 0.001, 10., 1,000.).

Now we create a custom stateful layer inheriting from keras.layers.Layers

In [67]:
class MyDense(keras.layers.Layer):
    def __init__(self,units,activation=None,**kwargs):
        super().__init__(**kwargs)
        self.units=units
        self.activation=keras.activations.get(activation)
    def build(self,batch_input_shape):
        self.kernel=self.add_weight(
            name="kernel",shape=[batch_input_shape[-1],self.units],
            initializer="glorot_normal"
        )
        self.bias=self.add_weight(
            name="bias",
            shape=[self.units],initializer="zeros"
        )
        super().build(batch_input_shape)
    def call(self,X):
        return self.activation(X@self.kernel+self.bias) #type:ignore
    def compute_output_shape(self, batch_input_shape):
        return tf.TensorShape(batch_input_shape.as_list()[:-1]+[self.units])
    def get_config(self):
        base_config = super().get_config()
        return {
                **base_config, "units": self.units,
                "activation": keras.activations.serialize(self.activation)
            }

In the above code:
* constructor(`__init__()`): It calls  the  parent  constructor,  passing  it  the  `kwargs`:  this  takes  care  of  standard arguments  such  as  `input_shape`,`trainable`,  and  `name`.  Then  it  saves  the  hyperparameters as attributes, converting the activation argument to the appropriate activation  function  using  the  `keras.activations.get()`  function (it  accepts functions, standard strings like `"relu"` or `"selu"`, or simply None)

* `def build(self,batch_input_shape)`:  to create the layer’s variables by calling the add_weight()  method  for  each  weight.  The  build()  method  is  called  the  first time  the  layer  is  used At  that  point,  Keras  will  know  the  shape  of  this  layer’s inputs, and it will pass it to the build() method this is necessary to create ome of the weights. For example, we need to know the number of neurons in the  previous  layer  in  order  to  create  the  connection  weights  matrix  (i.e.,  the "kernel"): this corresponds to the size of the last dimension of the inputs. At the end  of  the  build()  method  (and  only  at  the  end), we need to call the parent's build() method: this tells Keras that the layer is built (it just sets self.built=True).

* `def call(self,X)`: This is same as we did in the backpropagation. matrix product of weight vector(kernel) and the the input plus the bias vector and pass it to the activation function for manipulating the weights. 

* `def compute_output_shape(self, batch_input_shape)`: returns the shape of this layer's outputs. In this case, it is the same shape as the inputs, except the last dimension is replaced with the number of neurons in the layer.

* `get_config()` is self explanatory.

<div style="border:2px solid;border-radius:4px;width:50vw;">we can  generally  omit  the  <code>compute_output_shape()</code>  method,  as
<code>tf.keras</code>  automatically  infers  the  output  shape,  except  when  the
layer is dynamic.  In other Keras implemen‐
tations, this method is either required or its default implementation
assumes the output shape is the same as the input shape.</div>

To create layer with multiple inputs (e.g. `Concatenate`) the argument to `call()` method should be a tuple of all the inputs and similarly the argument to the `compute_output_shape()` method should be a tuple containing all the input batch shape. similarly return type must be a list of outputs.
here is an exaple for the same
```python
class MyMultiLayer(keras.layers.Layer):
    def call(self, X):
        X1, X2 = X
        return [X1 + X2, X1 * X2, X1 / X2]
    def compute_output_shape(self, batch_input_shape):
        b1, b2 = batch_input_shape
        return [b1, b1, b1] # should probably handle broadcasting rules
```

Now note that we cannot use this in sequential api as that supports only one input and one output. 

Now for the layer to have different behaviour during training and testing like in `Dropout` and `BatchNormalization`, we must add a training argument to the <code style="font-size: medium;">call()</code> method and use this argument to decide what to do. 

here we create a layer that adds gaussian noise during the training and does nothing during the testing.

```python
class MyGaussianNoise(keras.layers.Layer):
    def __init__(self, stddev, **kwargs):
        super().__init__(**kwargs)
        self.stddev = stddev
    def call(self, X, training=None):
        if training:
            noise = tf.random.normal(tf.shape(X), stddev=self.stddev)
            return X + noise
        else:
            return X
    def compute_output_shape(self, batch_input_shape):
        return batch_input_shape
```

### <div style="font-family:fantasy;">Custom Models</div>
we can build a custom model by making a new class for the same that inherits `keras.Model` class.
Let's build the model depicted here

<img src="https://onedrive.live.com/embed?resid=3E88EC0719160DD3%21351737&authkey=%21ACut8oZofHZMnNw&width=454&height=316" width="300" height="200" />

The inputs go through a first dense layer, then through a residual block composed of
two  dense  layers  and  an  addition  operation  (a  residual
block adds its inputs to its outputs), then through this same residual block three more
times, then through a second residual block, and the final result goes through a dense
output layer.

To implement this model, it is best to first create
a `ResidualBlock` layer, since we are going to create a couple of identical blocks (and
we might want to reuse it in another model):

In [68]:
class ResidualBlock(keras.layers.Layer):
    def __init__(self, n_layers, n_neurons, **kwargs):
        super().__init__(**kwargs)
        self.hidden = [keras.layers.Dense(n_neurons, activation="elu",
                                          kernel_initializer="he_normal")
                       for _ in range(n_layers)]

    def call(self, inputs):# here we are perfroming the actual addition operation
        Z = inputs
        for layer in self.hidden:
            Z = layer(Z)# the Z is replaced with the changed one after 
                        # every iteration of passing through a 
                        # layer in the hidden layers combination
        return inputs + Z

Now we use subclassing API to define our model using the `ResidualBlock`

In [69]:
class ResidualRegressor(keras.Model):
    def __init__(self,output_dim,**kwargs):
        super().__init__(**kwargs)
        self.hidden1=keras.layers.Dense(30,activation=keras.activations.elu,kernel_initializer='he_normal')
        self.block1=ResidualBlock(2,30)
        self.block2=ResidualBlock(2,30)
        self.out=keras.layers.Dense(output_dim)
    def call(self,inputs):
        Z=self.hidden1(inputs)
        for _ in range(1+3):
            Z=self.block1(Z)
        Z=self.block2(Z)
        return self.out(Z)

In [70]:
keras.backend.clear_session()
X_new_scaled = X_test_scaled

In [71]:
model = ResidualRegressor(1)
model.compile(loss="mse", optimizer="nadam")
history = model.fit(X_train_scaled, y_train, epochs=5)


Epoch 1/5


Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


In [72]:
score = model.evaluate(X_test_scaled, y_test)
y_pred = model.predict(X_new_scaled)



The  Model  class  is  a  subclass  of  the  Layer  class,  so  models  can  be  defined  and  used
exactly like layers. But a model has some extra functionalities, including of course its
compile(), fit(), evaluate(), and predict() methods (and a few variants), plus the
get_layers()  method  (which  can  return  any  of  the  model’s  layers  by  name  or  by
index)  and  the  save()  method  (and  support  for  keras.models.load_model()  and
keras.models.clone_model())

In [73]:
model.save('./models/model_chapter_12_custom_model.ckpt') #we used ckpt because it is created using subclassing API



INFO:tensorflow:Assets written to: ./models/model_chapter_12_custom_model.ckpt\assets


INFO:tensorflow:Assets written to: ./models/model_chapter_12_custom_model.ckpt\assets


In [74]:
model = keras.models.load_model("./models/model_chapter_12_custom_model.ckpt")

In [75]:
history = model.fit(X_train_scaled, y_train, epochs=5)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


we can also use sequential API for the same.

### <div style="font-family:fantasy;">Losses and Metric Based on training internals</div>
This can be helpful in regularization and also in keeping track of the internals of the model. 

To define a custom loss based on model internals, compute it based on any part of the
model  you  want,  then  pass  the  result  to  the  `add_loss()`  method.For  example,  let’s
build a custom regression MLP model composed of a stack of five hidden layers plus
an  output  layer.  This  custom  model  will  also  have  an  auxiliary  output  on  top  of  the
upper  hidden  layer.  The  loss  associated  to  this  auxiliary  output  will  be  called  the
reconstruction  loss:  it  is  the  mean  squared  difference  between  the
reconstruction and the inputs. By adding this reconstruction loss to the main loss, we
will  encourage  the  model  to  preserve  as  much  information  as  possible  through  the
hidden  layers—even  information  that  is  not  directly  useful  for  the  regression  task
itself.  In  practice,  this  loss  sometimes  improves  generalization  (it  is  a  regularization
loss)

In [76]:
class ReconstructingRegressor(keras.Model):
    def __init__(self,output_dim,**kwargs):
        super().__init__(**kwargs)
        self.hidden = [
                        keras.layers.Dense(30,activation='selu',kernel_initializer='lecun_normal')
                        for _ in range(5)
                        ]
        self.out=keras.layers.Dense(output_dim)
        self.reconstruction_mean = keras.metrics.Mean(name="reconstruction_error")
    def build(self, batch_input_shape):
        n_inputs=batch_input_shape[-1]
        self.reconstruct=keras.layers.Dense(n_inputs)
        # super().build(batch_input_shape)
    def call(self,inputs,training=None):
        Z=inputs
        for layer in self.hidden:
            Z=layer(Z)
        reconstruction = self.reconstruct(Z)
        recon_loss=tf.reduce_mean(tf.square(reconstruction-inputs))
        self.add_loss(0.05*recon_loss)
        if training:
            result = self.reconstruction_mean(recon_loss)
            self.add_metric(result)
        return self.out(Z)


In this code:
* The constructor creates the DNN with five layer (dense hidden layers) and one dense output layer.
* The  `build()`  method  creates  an  extra  dense  layer  which  will  be  used  to  reconstruct the inputs of the model. It must be created here because its number of units must  be  equal  to  the  number  of  inputs,  and  this  number  is  unknown  before  the `build()` method is called
* The  `call()`  method  processes  the  inputs  through  all  five  hidden  layers,  then passes  the  result  through  the  reconstruction  layer,  which  produces  the  reconstruction.
* Now again the `call()` method computes the reconstruction loss (the <span style="font-family:Menlo Nerd Font;font-size:small;color:red">Mean squared difference</span> between the reconstruction and inputs) and adds it to the model's list of losses using the `add_loss()` method. (here we have scaled down the reconstruction by multiplying it with 0.05 just to make sure that the `recon_loss` )
* finally the Z is passed through the output layer i.e self.out()

In [77]:
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

In [78]:
model = ReconstructingRegressor(1)
model.compile(loss="mse", optimizer="nadam")
history = model.fit(X_train_scaled, y_train, epochs=2)
y_pred = model.predict(X_test_scaled)

Epoch 1/2


Epoch 2/2


In [79]:
y_pred

array([[0.7758895],
       [1.9025637],
       [4.0374365],
       ...,
       [1.6079435],
       [2.892401 ],
       [4.2255177]], dtype=float32)

### <div style="font-family:fantasy;">Computing Gradients using autodiff</div>

consider the equation,\
$f(w_1,w_1) = 3w_1^2+2w_1w_2$
we can say that, $\frac{\partial{f(w_1,w_2)}}{\partial{w_1}} = 6w_1+2w_2$ also, $\frac{\partial{f(w_1,w_2)}}{\partial{w_2}} = 2w_1$.\
It was easy in case of just 2 parameters but will be harder in case of more parameters like in a DNN. Thus we use approximation.

### <div style="font-family:fantasy;">Custom Training Loops</div>
In some rare cases like wide and deep models the fit method doesnot work as we use only one optimizer in fit method while our model requires two. Here we need to design custom training loops for our model.


In [80]:
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

In [81]:
#building the model 
l2_reg = keras.regularizers.l2(0.05)
model = keras.models.Sequential([
    keras.layers.Dense(30,activation="elu",kernel_initializer="he_normal",kernel_regularizer=l2_reg),
    keras.layers.Dense(1,kernel_regularizer=l2_reg)
])

Next we randomly sample the training set

In [82]:
def random_batch(X,y,batch_size=32):
    idx=np.random.randint(len(X),size=batch_size)
    return X[idx],y[idx]

Next we define the function to display training status, number of steps, total number of steps the mean loss since start of epoch.

In [116]:
def print_status_bar(iteration, total, loss, metrics=None):
    metrics = " - ".join([f"{m.name}: {m.result():.4f} " for m in [loss] + (metrics or [])])
    end = "" if iteration < total else "\n"
    print(f"\r{iteration}/{total} - {metrics}",
          end=end)

In [117]:
#testing the status bar
import time
mean_loss = keras.metrics.Mean(name="loss")
mean_square = keras.metrics.Mean(name="mean_square")
for i in range(1, 50 + 1):
    loss = 1 / i
    mean_loss(loss)
    mean_square(i ** 2)
    print_status_bar(i, 50, mean_loss, [mean_square])
    time.sleep(0.05)

50/50 - loss: 0.0900  - mean_square: 858.5000 


In [121]:
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

Now we prepare our model for training data, but before that we define some hyperparams. and choose optimier, loss function and metrics.

In [122]:
n_epochs=5
batch_size=32
n_steps=len(X_train)//batch_size
optimizer = keras.optimizers.Nadam(learning_rate=0.01)
loss_fn = keras.losses.mean_squared_error
mean_loss = keras.metrics.Mean()
metrics=[keras.metrics.MeanAbsoluteError()]

In [125]:
#custom Loop
for epoch in range(1,n_epochs+1):
    print(f"Epoch{epoch}/{n_epochs}")
    for step in range(1,n_steps+1):
        X_batch,y_batch=random_batch(X_train_scaled,y_train)
        with tf.GradientTape() as tape:
            y_pred = model(X_batch,training=True)
            main_loss = tf.reduce_mean(loss_fn(y_batch,y_pred))
            loss = tf.add_n([main_loss]+model.losses) 
        gradients = tape.gradient(loss,model.trainable_variables)
        optimizer.apply_gradients(zip(gradients,model.trainable_variables))
        mean_loss(loss)
        for metric in metrics:
            metric(y_batch,y_pred)
        print_status_bar(step*batch_size,len(y_train),mean_loss,metrics)
    print_status_bar(len(y_train),len(y_train),mean_loss,metrics)
    for metric in [mean_loss]+metrics:
        metric.reset_states()

Epoch1/5
11610/11610 - mean: 0.6530  - mean_absolute_error: 0.5219 
Epoch2/5
11610/11610 - mean: 0.6304  - mean_absolute_error: 0.5088 
Epoch3/5
11610/11610 - mean: 0.6650  - mean_absolute_error: 0.5221 
Epoch4/5
11610/11610 - mean: 0.6376  - mean_absolute_error: 0.5129 
Epoch5/5
11610/11610 - mean: 0.6207  - mean_absolute_error: 0.5054 


* we create two loops one for the epochs and other for the batches in each epoch
* then we sample the random batch from the training set
* inside the `with tf.GradientTape() as tape:` block we compute the loss. First we find the testing predicted values with respect to the batch input then calculate the main loss by using `reduced_mean` as it should cumulate over each instance throughout the batch as it proceeds. Then define the net loss or `loss` by adding our main loss to the model losses i.e the regularization losses over here. `tf.add_n()` sums multiple tensors of same shape and same data type.
* next as ususal we compute the gradients of the loss over the trainable variables of the model using autodiff. After that we pass the gradients to the optimizer to perform gradient descent in the backpropagation.
* Then we find the mean loss cumulatively over each instance and display the status bar.
* finally after each epoch we reset the `mean_loss` and the `metrics`

If we add weight constaints using `kernel_constraints` or `bias_constraints` we should update the loop to apply these constraints just after `apply_gradients()`:
```python
#custom Loop
for epoch in range(1,n_epochs+1):
    print(f"Epoch{epoch}/{n_epochs}")
    for step in range(1,n_steps+1):
        X_batch,y_batch=random_batch(X_train_scaled,y_train)
        with tf.GradientTape() as tape:
            y_pred = model(X_batch,training=True)
            main_loss = tf.reduce_mean(loss_fn(y_batch,y_pred))
            loss = tf.add_n([main_loss]+model.losses) 
        gradients = tape.gradient(loss,model.trainable_variables)
        optimizer.apply_gradients(zip(gradients,model.trainable_variables))
        # here we insert the code
        for variable in model.variables:
            if variable.constraint is not None:
                variable.assign(variable.constraint(variable))
        mean_loss(loss)
        for metric in metrics:
            metric(y_batch,y_pred)
        print_status_bar(step*batch_size,len(y_train),mean_loss,metrics)
    print_status_bar(len(y_train),len(y_train),mean_loss,metrics)
    for metric in [mean_loss]+metrics:
        metric.reset_states()
```

Now this custom loop does not handle training and testing differently. To handle these we need to call model with <code style="font-size: medium;color: skyblue;">training</code><code style="font-size: medium;color:white"> = </code><code style="font-size: medium;color:rgb(0, 161, 254)">True</code> and make sure it propagates to every layer concerned with it.