In [1]:
import tensorflow as tf
from tensorflow import keras
import numpy as np
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
tf.__version__

'2.3.0'

In [2]:
# creating matrix
t =tf.constant([
                 [1., 2., 3.],
                 [4., 5., 6.]
               ])
t

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)>

In [3]:
# scalar 
tf.constant(42) 

<tf.Tensor: shape=(), dtype=int32, numpy=42>

In [4]:
t.shape

TensorShape([2, 3])

In [5]:
t.dtype

tf.float32

In [6]:
t[:, 1:]

<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[2., 3.],
       [5., 6.]], dtype=float32)>

In [7]:
t[..., 1, tf.newaxis]

<tf.Tensor: shape=(2, 1), dtype=float32, numpy=
array([[2.],
       [5.]], dtype=float32)>

In [8]:
t + 10

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[11., 12., 13.],
       [14., 15., 16.]], dtype=float32)>

In [9]:
tf.square(t)

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[ 1.,  4.,  9.],
       [16., 25., 36.]], dtype=float32)>

In [10]:
tf.transpose(t)

<tf.Tensor: shape=(3, 2), dtype=float32, numpy=
array([[1., 4.],
       [2., 5.],
       [3., 6.]], dtype=float32)>

In [11]:
# @ operator was added in Python 3.5, for matrix multiplication
t @ tf.transpose(t)

<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[14., 32.],
       [32., 77.]], dtype=float32)>

In [12]:
# operations using keras backend
K = keras.backend
K.square(K.transpose(t)) + 10

<tf.Tensor: shape=(3, 2), dtype=float32, numpy=
array([[11., 26.],
       [14., 35.],
       [19., 46.]], dtype=float32)>

In [13]:
a = np.array([2., 4., 5.])
tf.constant(a)

<tf.Tensor: shape=(3,), dtype=float64, numpy=array([2., 4., 5.])>

In [14]:
t.numpy() 

array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)

In [15]:
tf.square(a)

<tf.Tensor: shape=(3,), dtype=float64, numpy=array([ 4., 16., 25.])>

In [16]:
 np.square(t)

array([[ 1.,  4.,  9.],
       [16., 25., 36.]], dtype=float32)

In [17]:
# no autmatic type conversion in tf as this degrades the performance
# tf.constant(2.) + tf.constant(40)

In [18]:
t2 = tf.constant(40., dtype=tf.float64)
tf.constant(2.0) + tf.cast(t2, tf.float32)

<tf.Tensor: shape=(), dtype=float32, numpy=42.0>

In [19]:
v = tf.Variable([[1., 2., 3.], [4., 5., 6.]])
v

<tf.Variable 'Variable:0' shape=(2, 3) dtype=float32, numpy=
array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)>

In [20]:
v.assign(2 * v)

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2.,  4.,  6.],
       [ 8., 10., 12.]], dtype=float32)>

In [21]:
v

<tf.Variable 'Variable:0' shape=(2, 3) dtype=float32, numpy=
array([[ 2.,  4.,  6.],
       [ 8., 10., 12.]], dtype=float32)>

In [22]:
v[0, 1].assign(42)

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2., 42.,  6.],
       [ 8., 10., 12.]], dtype=float32)>

In [23]:
v

<tf.Variable 'Variable:0' shape=(2, 3) dtype=float32, numpy=
array([[ 2., 42.,  6.],
       [ 8., 10., 12.]], dtype=float32)>

In [24]:
v[:, 2].assign([0., 1.])

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2., 42.,  0.],
       [ 8., 10.,  1.]], dtype=float32)>

In [25]:
v

<tf.Variable 'Variable:0' shape=(2, 3) dtype=float32, numpy=
array([[ 2., 42.,  0.],
       [ 8., 10.,  1.]], dtype=float32)>

In [26]:
v.scatter_nd_update(indices=[[0, 0], [1, 2]], updates=[100., 200.])
v

<tf.Variable 'Variable:0' shape=(2, 3) dtype=float32, numpy=
array([[100.,  42.,   0.],
       [  8.,  10., 200.]], dtype=float32)>

In [27]:
## Dataset from Training
housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(housing.data, housing.target.reshape(-1, 1), 
                                                              random_state=42)

X_train, X_valid, y_train, y_valid = train_test_split(X_train_full, y_train_full, random_state=42)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_valid_scaled = scaler.transform(X_valid)
X_test_scaled = scaler.transform(X_test)

In [28]:
### custom loss function
def huber_fn(y_true, y_pred):
    error = y_true - y_pred
    is_small_error = tf.abs(error) < 1
    squared_loss = tf.square(error) / 2
    linear_loss = tf.abs(error) - 0.5
    return tf.where(is_small_error, squared_loss, linear_loss)

In [29]:
input_shape = X_train.shape[1:]

model = keras.models.Sequential([keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal",
                                                    input_shape=input_shape),
                                 keras.layers.Dense(1),])

model.compile(loss=huber_fn, optimizer="nadam", metrics=["mae"])

model.fit(X_train_scaled, y_train, epochs=2,validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
Epoch 2/2


<tensorflow.python.keras.callbacks.History at 0x1aaffe5ab38>

In [30]:
# saving custom model
model.save("my_model_with_a_custom_loss.h5")

In [31]:
# loading custom model
model = keras.models.load_model("my_model_with_a_custom_loss.h5",custom_objects={"huber_fn": huber_fn})

In [32]:
# threshold as parameter

def create_huber(threshold=1.0):
    def huber_fn(y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < threshold
        squared_loss = tf.square(error) / 2
        linear_loss  = threshold * tf.abs(error) - threshold**2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)
    return huber_fn

model.compile(loss=create_huber(2.0), optimizer="nadam", metrics=["mae"])
model.fit(X_train_scaled, y_train, epochs=2,validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
Epoch 2/2


<tensorflow.python.keras.callbacks.History at 0x1aa881335f8>

In [33]:
model.save("my_model_with_a_custom_loss_threshold_2.h5")
model = keras.models.load_model("my_model_with_a_custom_loss_threshold_2.h5",
                                custom_objects={"huber_fn": create_huber(2.0)})

In [34]:
# custom loss using keras sub class
class HuberLoss(keras.losses.Loss):
    def __init__(self, threshold=1.0, **kwargs):
        self.threshold = threshold
        super().__init__(**kwargs)
    
    def call(self, y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < self.threshold
        squared_loss = tf.square(error) / 2
        linear_loss = self.threshold * tf.abs(error) - self.threshold**2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)
    
    # get config will be called when saving the model 
    # this aovids needs to pass custom object dictionary while model load
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}

In [35]:
model.compile(loss=HuberLoss(2.), optimizer="nadam")
model.fit(X_train_scaled, y_train, epochs=2,validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
Epoch 2/2


<tensorflow.python.keras.callbacks.History at 0x1aa892d8780>

In [36]:
model.save("my_model_with_a_custom_loss_class.h5")
model = keras.models.load_model("my_model_with_a_custom_loss_class.h5",custom_objects={"HuberLoss": HuberLoss})

In [37]:
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

In [38]:
# custom training function implementations

def my_softplus(z): # return value is just tf.nn.softplus(z)
    return tf.math.log(tf.exp(z) + 1.0)

def my_glorot_initializer(shape, dtype=tf.float32):
    stddev = tf.sqrt(2. / (shape[0] + shape[1]))
    return tf.random.normal(shape, stddev=stddev, dtype=dtype)

def my_l1_regularizer(weights):
    return tf.reduce_sum(tf.abs(0.01 * weights))

def my_positive_weights(weights): # return value is just tf.nn.relu(weights)
    return tf.where(weights < 0., tf.zeros_like(weights), weights)

In [39]:
model = keras.models.Sequential([keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal",
                                                    input_shape=input_shape),
                                 keras.layers.Dense(1, activation=my_softplus,
                                                    kernel_regularizer=my_l1_regularizer,
                                                    kernel_constraint=my_positive_weights,
                                                    kernel_initializer=my_glorot_initializer),])

model.compile(loss="mse", optimizer="nadam", metrics=["mae"])
model.fit(X_train_scaled, y_train, epochs=2,validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
Epoch 2/2


<tensorflow.python.keras.callbacks.History at 0x1aa8a44dbe0>

In [40]:
# saving model
model.save("my_model_with_many_custom_parts.h5")

# loading model
model = keras.models.load_model("my_model_with_many_custom_parts.h5",
                                custom_objects={"my_l1_regularizer": my_l1_regularizer,
                                                "my_positive_weights": my_positive_weights,
                                                "my_glorot_initializer": my_glorot_initializer,
                                                "my_softplus": my_softplus,})

In [41]:
# streaming metric (or stateful metric), as it is gradually updated, batch after batch
precision = keras.metrics.Precision()

# 5 positive predictions in the first batch, 4 of which were correct: that’s 80% precision
precision([0, 1, 1, 1, 0, 1, 0, 1], [1, 1, 0, 1, 0, 1, 0, 1])

<tf.Tensor: shape=(), dtype=float32, numpy=0.8>

In [42]:
# 3 positive predictions in the second batch, but they were all incorrect: that’s 0% precision for the second batch
# mean of these two precisions, you get 40%
# there were a total of 4 true positives (4 +0) out of 8 positive predictions (5 + 3), 
# so the overall precision is 50%, not 40%
precision([0, 1, 0, 0, 1, 0, 1, 1], [1, 0, 1, 1, 0, 0, 0, 0])

<tf.Tensor: shape=(), dtype=float32, numpy=0.5>

In [43]:
precision.result()

<tf.Tensor: shape=(), dtype=float32, numpy=0.5>

In [44]:
precision.variables

[<tf.Variable 'true_positives:0' shape=(1,) dtype=float32, numpy=array([4.], dtype=float32)>,
 <tf.Variable 'false_positives:0' shape=(1,) dtype=float32, numpy=array([4.], dtype=float32)>]

In [45]:
### creating custom streaming metric
class HuberMetric(keras.metrics.Metric):
    
    def __init__(self, threshold=1.0, **kwargs):
        super().__init__(**kwargs) # handles base args (e.g., dtype)
        self.threshold = threshold
        self.huber_fn = create_huber(threshold)
        
        # The constructor uses the add_weight() method to create the variables needed 
        # to keep track of the metric’s state over multiple batches—in this case, 
        # the sum of all Huber losses (total) and the number of instances seen so far (count
        
        self.total = self.add_weight("total", initializer="zeros")
        self.count = self.add_weight("count", initializer="zeros")
    
   
    # The update_state() method is called when you use an instance of this class as a function 
    # (as we did with the Precision object).
    # It updates the variables, given the labels and predictions
    
    def update_state(self, y_true, y_pred, sample_weight=None):
        metric = self.huber_fn(y_true, y_pred)
        self.total.assign_add(tf.reduce_sum(metric))
        self.count.assign_add(tf.cast(tf.size(y_true), tf.float32))
    
    # computes and returns the final result
    
    def result(self):
        return self.total / self.count
    
    # get_config() method to ensure the threshold gets saved along with the mode
    
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}

In [47]:
model.compile(loss="mse", optimizer="nadam", metrics=["mae",HuberMetric()])
model.fit(X_train_scaled, y_train, epochs=2,validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
Epoch 2/2


<tensorflow.python.keras.callbacks.History at 0x1aa8a66bba8>

In [49]:
# model.save("my_model_with_a_custom_metric.h5")
# model = keras.models.load_model("my_model_with_a_custom_metric.h5",           
#                                custom_objects={"huber_fn": create_huber(2.0),
#                                                "HuberMetric": HuberMetric})

In [50]:
### Custom Layers
class MyDense(keras.layers.Layer):
    def __init__(self, units, activation=None, **kwargs):
        
         # kwargs: this takes care of standard arguments such as input_shape, trainable, and name
        super().__init__(**kwargs)
        self.units = units
        
        # the activation argument to the appropriate activation function using the keras.activations.get()
        self.activation = keras.activations.get(activation)

    # The build() method is called the first time the layer is used. 
    def build(self, batch_input_shape):
        self.kernel = self.add_weight(name="kernel", shape=[batch_input_shape[-1], self.units],
                                      initializer="glorot_normal")
        
        self.bias = self.add_weight(name="bias", shape=[self.units], initializer="zeros")
        
        super().build(batch_input_shape) # must be at the end

    def call(self, X):
        return self.activation(X @ self.kernel + self.bias)

    def compute_output_shape(self, batch_input_shape):
        return tf.TensorShape(batch_input_shape.as_list()[:-1] + [self.units])

    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "units": self.units, "activation": keras.activations.serialize(self.activation)}

In [51]:
model = keras.models.Sequential([MyDense(30, activation="relu", input_shape=input_shape),
                                 MyDense(1)])

model.compile(loss="mse", optimizer="nadam", metrics=["mae"])
model.fit(X_train_scaled, y_train, epochs=2,validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
Epoch 2/2


<tensorflow.python.keras.callbacks.History at 0x1aa8b9a1748>

In [52]:
model.save("my_model_with_a_custom_layer.h5")
model = keras.models.load_model("my_model_with_a_custom_layer.h5",custom_objects={"MyDense": MyDense})

In [53]:
## custom Layer with multiple input
class MyMultiLayer(keras.layers.Layer):
    def call(self, X):
        X1, X2 = X
        return X1 + X2, X1 * X2

    def compute_output_shape(self, batch_input_shape):
        batch_input_shape1, batch_input_shape2 = batch_input_shape
        return [batch_input_shape1, batch_input_shape2]
    
inputs1 = keras.layers.Input(shape=[2])
inputs2 = keras.layers.Input(shape=[2])
outputs1, outputs2 = MyMultiLayer()((inputs1, inputs2))

In [54]:
## Custom Layers with different Training & Testing behviour

class AddGaussianNoise(keras.layers.Layer):
    def __init__(self, stddev, **kwargs):
        super().__init__(**kwargs)
        self.stddev = stddev

    def call(self, X, training=None):
        if training:
            noise = tf.random.normal(tf.shape(X), stddev=self.stddev)
            return X + noise
        else:
            return X

    def compute_output_shape(self, batch_input_shape):
        return batch_input_shape

In [55]:
model = keras.models.Sequential([MyDense(30, activation="relu", input_shape=input_shape),
                                 AddGaussianNoise(1),
                                 MyDense(1)])

model.compile(loss="mse", optimizer="nadam", metrics=["mae"])
model.fit(X_train_scaled, y_train, epochs=2,validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
Epoch 2/2


<tensorflow.python.keras.callbacks.History at 0x1aa8cb52320>

In [56]:
model.predict(X_test)

array([[298.18564],
       [335.61545],
       [280.36676],
       ...,
       [465.43787],
       [505.62857],
       [222.74283]], dtype=float32)