In [0]:
%tensorflow_version 2.x

In [0]:
import tensorflow as tf
import tensorflow.keras as keras

# Custom Loss function

In [0]:
def huber_loss(y_true,y_pred):
  error = y_true - y_pred
  is_small_error = tf.abs(error) < 1
  squared_loss = tf.square(error) / 2
  linear_loss = tf.abs(error) - 0.5
  return tf.where(is_small_error, squared_loss, linear_loss)

model.compile(
    loss = huber_loss,
    optimizer = 'nadam'
)

model = keras.models.load_model("my_model_with_a_custom_loss.h5", custom_objects={"huber_loss":huber_loss})

In [0]:
def create_huber(threshold):
  def huber_fn(y_true,y_pred):
    error = y_true - y_pred
    is_small_error = tf.abs(error) < 1
    squared_loss = tf.square(error) / 2
    linear_loss = tf.abs(error) - 0.5
    return tf.where(is_small_error, squared_loss, linear_loss)
  return huber_fn

model.compile(
    loss = create_huber(threshold=0.9),
    optimizer = 'nadam'
)

model = keras.models.load_model("my_model_with_a_custom_loss.h5", custom_objects={"huber_loss":create_huber})

In [0]:
class HuberLoss (keras.losses.Loss):
  def __init__(self,threshold=1.0,**kwargs):
    self.threshold = threshold
    super().__init__(**kwargs)
  
  def call(self,y_true,y_pred):
    error = y_true - y_pred
    is_small_error = tf.abs(error) < 1
    squared_loss = tf.square(error) / 2
    linear_loss = tf.abs(error) - 0.5
    return tf.where(is_small_error, squared_loss, linear_loss)
  
  def get_config(self):
    base_config = super().get_config()
    return {**base_config, "threshold": self.threshold}  

model.compile(loss=HuberLoss(2.),optimizer='nadam')

model = keras.models.load_model("my_model_with_a_custom_loss.h5", custom_objects={"HuberLoss":HuberLoss})

# Custom Activation Functions

In [0]:
def my_softplus(z):
  return tf.math.log(tf.exp(z)+1.0)

# Custom kernal Initiliazer

In [0]:
def my_golrot_initializer(shape,dtype=tf.float32):
  stddev = tf.sqrt(2. / (shape[0]+shape[1]))
  return tf.random.normal(shape,stddev=stddev,dtype=dtype)

# Custom Regularizer

In [0]:
def my_l1_regularizer(weights):
  return tf.reduce_sum(tf.abs(0.01 * weights))

# Custom Constraints

In [0]:
def my_positive_weights(weights):
  return tf.where(weights<0,tf.zeros_like(weights),weights)

### Usage

In [0]:
layer = keras.layers.Dense(
    30,
    activation=my_softplus,
    kernel_initializer = my_golrot_initializer,
    kernel_regularizer = my_l1_regularizer,
    kernel_constraint = my_positive_weights
)

In [0]:
class MyL1_regularizer(keras.regularizers.Regularizer):
  def __init__(self,factor):
    self.factor = factor
  
  def __call__(self,weights):
    return tf.reduce_sum(tf.abs(self.factor*weights))
  
  def get_config(self):
    return {'factor': self.factor}

 - call() for Losses, Layers, Models
 -  as above call()  for regularizers, initializers, constraints

# Custom Metrics

### Streaming/Stateful Metric

In [37]:
precision = keras.metrics.Precision()
precision([0,1,1,1,0,1,0,1],[1,1,0,1,0,1,0,1])

<tf.Tensor: shape=(), dtype=float32, numpy=0.8>

In [38]:
precision([0,1,0,0,1,0,1,1],[1,0,1,1,0,0,0,0])

<tf.Tensor: shape=(), dtype=float32, numpy=0.5>

In [39]:
precision.result()

<tf.Tensor: shape=(), dtype=float32, numpy=0.5>

In [40]:
precision.variables

[<tf.Variable 'true_positives:0' shape=(1,) dtype=float32, numpy=array([4.], dtype=float32)>,
 <tf.Variable 'false_positives:0' shape=(1,) dtype=float32, numpy=array([4.], dtype=float32)>]

In [0]:
precision.reset_states()

In [0]:
class HuberMetric(keras.metrics.Metric):
  def __init__(self,threshold=1.0,**kwargs):
    super().__init__(**kwargs)
    self.threshold = threshold
    self.huber_fn = create_huber(threshold)
    self.total = self.add_weight('total',initializer='zeros')
    self.count = self.add_weight('count',initializer='zeros')
  
  def update_states(self,y_true,y_pred,sample_weights=None):
    metric = self.huber_fn(y_true,y_pred)
    self.total.assign_add(tf.reduce_sum(metric))
    self.count.assign_add(tf.cast(tf.size(y_true),tf.float32))
  
  def result(self):
    return self.total/self.count
  
  def get_config(self):
    base_config = super().get_config()
    return {**base_config, "threshold":threshold}

# Custom Layer

#### Layer with out any weights - Can be also used as activation function

In [0]:
exponential_layer = keras.layers.Lambda(lambda x:tf.exp(x))

#### Stateful Layer

In [0]:
class MyDense(keras.layers.Layer):
  def __init__(self,units,activation=None,**kwargs):
    super().__init__(**kwargs)
    self.units = units
    self.activation =  keras.activations.get(activation)
  
  def build(self,batch_input_shape):
    self.kernel = self.add_weight(
        name='kernel',
        shape = [batch_input_shape[-1],self.units],
        initializer = 'glorot_normal'
    )
    self.bias = self.add_weight(
        name='bias',
        shape=[self.units],
        initializer="zeros"
    )
    super().build(batch_input_shape)
  
  def call(self,x):
    return self.activation(X @ self.kernel + self.bias)
  
  def compute_output_shape(self,batch_input_shape):
    return tf.TensorShape(batch_input_shape.as_list()[:-1]+[self.units])
  
  def get_config(self):
    base_config = super().get_config()
    return {
        **base_config,
        "units": self.units,
        "activation": keras.activations.serialize(self.activation)
    }

#### Multi Input Layer

In [0]:
class MyMultiLayer(keras.layers.Layer):
  def call(self,X):
    x1,x2 = X
    return [x1+x2,x1*x2,x1/x2]
  def compute_output_shape(self,batch_input_shape):
    b1,b2 = batch_input_shape
    return [b1,b1,b1]

#### Miscellaneous Functional Layer

In [0]:
class MyGaussianNoise(keras.layers.Layer):
  def __init__(self,stddev,**kwargs):
    super().__init__(*kwargs)
    self.stddev = stddev
  
  def call(self,X,training=None):
    if training:
      noise = tf.random.normal(tf.shape(X),stddev=self.stddev)
      return X+noise
    else:
      return X
  
  def compute_output_shape(self,batch_input_shape):
    return batch_input_shape

# Custom Models

In [0]:
class ResidualLayer(keras.layers.Layer):
  def __init__(self, n_layers,n_neurons,**kwargs):
    super().__init__()
    self.hidden = [keras.layers.Dense(n_neurons,activation='elu',kernel_initializer="he_normal") for _ in range(n_layers)]
  
  def call(self,inputs):
    Z= inputs
    for layer in self.hidden:
      Z = layer(Z)
    return inputs+Z

In [0]:
class ResidualModel(keras.Model):
  def __init__(self, output_dim, **kwargs):
    super().__init__(**kwargs)
    self.hidden1 = keras.layers.Dense(30,activation="elu",kernel_initializer="he_normal")
    self.block1 = ResidualLayer(2,30)
    self.block2 = ResidualLayer(2,30)
    self.out=keras.layers.Dense(output_dim)
  
  def call(self,inputs):
    Z = self.hidden1(inputs)
    for _ in range(1+3):
      Z = self.block1(Z)
    Z =self.block2(Z)
    return self.out(Z)

# Losses and Metrics Based on Model Internals

In [0]:
class ReconstructionRegressor(keras.Model):
  def __init__(self,output_dim,**kwargs):
    super().__init__()
    self.hidden = [keras.layers.Dense(30,activation="selu",kernel_initializer="lecun_normal") for _ in range(5)]
    self.out = keras.layers.Dense(output_dim)
  
  def build(self, batch_shape_input):
    n_inputs = batch_shape_input[-1]
    self.reconstruct = keras.layers.Dense(n_inputs)
    super().build(batch_shape_input)
  
  def call(self,inputs):
    Z = inputs
    for layer in self.hidden:
      Z = layer(Z)
    reconstruction = self.reconstruct(Z)
    recon_loss = tf.reduce_mean(tf.square(reconstruction - inputs))
    self.add_loss(0.05* recon_loss)
    return self.out(Z)

# Gradient Taping

In [0]:
def f(w1,w2):
  return 3 * w1 **2 + 2 * w1 * w2

In [0]:
w1,w2 = 5,3
eps = 1e-5

In [53]:
(f(w1+eps,w2) - f(w1,w2))/eps

36.00002999917251

In [54]:
(f(w1,w2+eps) - f(w1,w2))/eps

10.000000000331966

In [0]:
w1,w2 = tf.Variable(5.),tf.Variable(3.)
with tf.GradientTape() as tape:
  z = f(w1,w2)
gradients = tape.gradient(z,[w1,w2])

In [56]:
gradients

[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=10.0>]

In [57]:
with tf.GradientTape() as tape:
  z = f(w1,w2)
gradients = tape.gradient(z,w1)
gradients = tape.gradient(z,w2)

RuntimeError: ignored

In [0]:
with tf.GradientTape(persistent=True) as tape:
  z = f(w1,w2)
gradients = tape.gradient(z,w1)
gradients = tape.gradient(z,w2)
del gradients

In [0]:
c1,c2 = tf.constant(5.),tf.constant(3.)
with tf.GradientTape() as tape:
  z = f(c1,c2)
gradients = tape.gradient(z,[c1,c2])

In [60]:
gradients

[None, None]

In [0]:
c1,c2 = tf.constant(5.),tf.constant(3.)
with tf.GradientTape() as tape:
  tape.watch(c1)
  tape.watch(c2)
  z = f(c1,c2)
gradients = tape.gradient(z,[c1,c2])

In [62]:
gradients

[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=10.0>]

In [0]:
def f(w1,w2):
  return 3 * w1 **2 + tf.stop_gradient(2 * w1 * w2)
w1,w2 = tf.Variable(5.),tf.Variable(3.)
with tf.GradientTape() as tape:
  z = f(w1,w2)
gradients = tape.gradient(z,[w1,w2])

In [64]:
gradients

[<tf.Tensor: shape=(), dtype=float32, numpy=30.0>, None]

In [66]:
x = tf.Variable([100.])
with tf.GradientTape() as tape:
  z = my_softplus(x)
tape.gradient(z,[x])

[<tf.Tensor: shape=(1,), dtype=float32, numpy=array([nan], dtype=float32)>]

In [0]:
@tf.custom_gradient
def my_better_softplus(z):
  exp = tf.exp(z)
  def my_soft_plus_gradients(grad):
    return grad/(1+1/exp)
  return tf.math.log(exp+1), my_soft_plus_gradients

In [70]:
x = tf.Variable([100.])
with tf.GradientTape() as tape:
  z = my_better_softplus(x)
tape.gradient(z,[x])

[<tf.Tensor: shape=(1,), dtype=float32, numpy=array([1.], dtype=float32)>]

# Custom Training Loops

In [0]:
l2_reg = keras.regularizers.l2(0.05)
model = keras.models.Sequential(
    [keras.layers.Dense(30,activation='relu',kernel_initializer="he_normal",kernel_regularizer=l2_reg),
    keras.layers.Dense(1,kernel_regularizer=l2_reg)]
)

In [0]:
def random_batch(X,y,batch_size=32):
  idx = np.random.randint(len(X),size=batch_size)
  return X[idx],y[idx]

In [0]:
def print_status_bar(iteration,total,loss,metrics=None):
  metrics = '-'.join(["{}:{:.4f}".format(m.name,m.result()) for m in [loss] + (metrics or [])])
  end = "" if iteration < total else "\n"
  print("\r{}/{} - ".format(iteration, total)+ metrics, end=end)

#### HyperParameters

In [77]:
n_epochs =5
batch_size = 32
n_steps =  len(X_train) // batch_size
optimizer = keras.optimizers.Nadam(lr=0.01)
loss_fn = keras.losses.mean_squared_error
mean_loss = keras.metrics.Mean()
metrics = [keras.metrics.MeanAbsoluteError()]

NameError: ignored

##### Custom Loop

In [79]:
for epoch in range(1,n_epochs+1):
  print("Epoch {}/{}".format(epoch, n_epochs))
  for step in range(1, n_steps + 1):
    X_batcch, y_batch = random_batch(X_train_scaled,y_train)
    with tf.GraientTape() as tape:
      y_pred = model(X_batch,y_batch)
      main_loss = tf.reduce_mean(loss_fn(y_batch,y_pred))
      loss = tf.add_n([main_loss]+model.losses)
    gradients = tape.gradient(loss,model.trainable_variables)
    optimizer.apply_gradients(zip(gradients,model.trainable_variables))
    mean_loss(loss)
    for metric in metrics:
      metric(y_batch,y_pred)
    print_status_bar(step * batch_size, len(y_train),mean_loss,metrics)
  print_status_bar(len(y_train), len(y_train),mean_loss,metrics)
  for metric in [mean_loss]+metrics:
    metric.reset_states()

Epoch 1/5


NameError: ignored

# Tensorflow Functions And Graphs

In [0]:
def cube(x):
  return x**3

In [81]:
cube(tf.constant(3.))

<tf.Tensor: shape=(), dtype=float32, numpy=27.0>

In [83]:
tf_cube = tf.function(cube)
tf_cube

<tensorflow.python.eager.def_function.Function at 0x7f1542f614e0>

In [84]:
tf_cube(3)

<tf.Tensor: shape=(), dtype=int32, numpy=27>

In [85]:
tf_cube(tf.constant(3.))

<tf.Tensor: shape=(), dtype=float32, numpy=27.0>

In [0]:
@tf.function
def tf_cube(x):
  return x**3

In [87]:
tf_cube(tf.constant(3.))

<tf.Tensor: shape=(), dtype=float32, numpy=27.0>

In [88]:
tf_cube.python_function(2)

8