**Chapter 12 – Distributed TensorFlow**

In [7]:
# To support both python 2 and python 3
from __future__ import division, print_function, unicode_literals

# Common imports
import numpy as np
import os

try:
    # %tensorflow_version only exists in Colab.
    %tensorflow_version 1.x
except Exception:
    pass

# to make this notebook's output stable across runs
def reset_graph(seed=42):
    tf.reset_default_graph()
    tf.set_random_seed(seed)
    np.random.seed(seed)

# To plot pretty figures
%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt
plt.rcParams['axes.labelsize'] = 14
plt.rcParams['xtick.labelsize'] = 12
plt.rcParams['ytick.labelsize'] = 12

# Where to save the figures
PROJECT_ROOT_DIR = "."
CHAPTER_ID = "distributed"
IMAGES_PATH = os.path.join(PROJECT_ROOT_DIR, "images", CHAPTER_ID)
os.makedirs(IMAGES_PATH, exist_ok=True)

def save_fig(fig_id, tight_layout=True, fig_extension="png", resolution=300):
    path = os.path.join(IMAGES_PATH, fig_id + "." + fig_extension)
    print("Saving figure", fig_id)
    if tight_layout:
        plt.tight_layout()
    plt.savefig(path, format=fig_extension, dpi=resolution)

# Local server

In [33]:
import tensorflow.compat.v2 as tf


In [11]:
c = tf.constant("Hello distributed TensorFlow!")
server = tf.train.Server.create_local_server()

In [12]:
with tf.Session(server.target) as sess:
    print(sess.run(c))

b'Hello distributed TensorFlow!'


In [27]:
import tensorflow as tf2

# Loss function 

you want to train a regression model, but your training set is a bit noisy. Of
course, you start by trying to clean up your dataset by removing or fixing the outliers,
but it turns out to be insufficient, the dataset is still noisy. Which loss function should
you use? The mean squared error might penalize large errors too much, so your
model will end up being imprecise. The mean absolute error would not penalize out‐
liers as much, but training might take a while to converge and the trained model
might not be very precise. This is probably a good time to use the Huber loss (intro‐
duced in Chapter 10) instead of the good old MSE. The Huber loss is not currently
part of the official Keras API, but it is available in tf.keras (just use an instance of the
keras.losses.Huber class).

In [15]:
def huber_fn(y_true, y_pred):
 error = y_true - y_pred
 is_small_error = tf.abs(error) < 1
 squared_loss = tf.square(error) / 2
 linear_loss = tf.abs(error) - 0.5
 return tf.where(is_small_error, squared_loss, linear_loss)

Next, you can just use this loss when you compile the Keras model, then train your model:

In [None]:
model.compile(loss=huber_fn, optimizer="nadam")
model.fit(X_train, y_train, [...])

Saving a model containing a custom loss function actually works fine, as Keras just
saves the name of the function. However, whenever you load it, you need to provide a
dictionary that maps the function name to the actual function. More generally, when
you load a model containing custom objects, you need to map the names to the
objects:


In [None]:
model = keras.models.load_model("my_model_with_a_custom_loss.h5",
 custom_objects={"huber_fn": huber_fn})

or we could use the HuberLoss class by using keras.losses.loss subclass and We will use custom object for sure after saving and calling the model again

In [17]:
import keras
class HuberLoss(keras.losses.Loss):
 def __init__(self, threshold=1.0, **kwargs):
    self.threshold = threshold
    super().__init__(**kwargs)
 def call(self, y_true, y_pred):
    error = y_true - y_pred
    is_small_error = tf.abs(error) < self.threshold
    squared_loss = tf.square(error) / 2
    linear_loss = self.threshold * tf.abs(error) - self.threshold**2 / 2
    return tf.where(is_small_error, squared_loss, linear_loss)
 def get_config(self):
    base_config = super().get_config()
    return {**base_config, "threshold": self.threshold}

# Example

In [19]:
import pandas as pd
url = 'http://archive.ics.uci.edu/ml/machine-learning-databases/auto-mpg/auto-mpg.data'
column_names = ['MPG', 'Cylinders', 'Displacement', 'Horsepower', 'Weight',
                'Acceleration', 'Model Year', 'Origin']

raw_dataset = pd.read_csv(url, names=column_names,
                          na_values='?', comment='\t',
                          sep=' ', skipinitialspace=True)

dataset = raw_dataset.copy()
dataset.tail()

Unnamed: 0,MPG,Cylinders,Displacement,Horsepower,Weight,Acceleration,Model Year,Origin
393,27.0,4,140.0,86.0,2790.0,15.6,82,1
394,44.0,4,97.0,52.0,2130.0,24.6,82,2
395,32.0,4,135.0,84.0,2295.0,11.6,82,1
396,28.0,4,120.0,79.0,2625.0,18.6,82,1
397,31.0,4,119.0,82.0,2720.0,19.4,82,1


In [20]:
dataset = dataset.dropna()

In [21]:
dataset['Origin'] = dataset['Origin'].map({1: 'USA', 2: 'Europe', 3: 'Japan'})

dataset = pd.get_dummies(dataset, columns=['Origin'], prefix='', prefix_sep='')
dataset.tail()

Unnamed: 0,MPG,Cylinders,Displacement,Horsepower,Weight,Acceleration,Model Year,Europe,Japan,USA
393,27.0,4,140.0,86.0,2790.0,15.6,82,0,0,1
394,44.0,4,97.0,52.0,2130.0,24.6,82,1,0,0
395,32.0,4,135.0,84.0,2295.0,11.6,82,0,0,1
396,28.0,4,120.0,79.0,2625.0,18.6,82,0,0,1
397,31.0,4,119.0,82.0,2720.0,19.4,82,0,0,1


In [22]:
train_dataset = dataset.sample(frac=0.8, random_state=0)
test_dataset = dataset.drop(train_dataset.index)

In [24]:
train_features = train_dataset.copy()
test_features = test_dataset.copy()

train_labels = train_features.pop('MPG')
test_labels = test_features.pop('MPG')

In [35]:
from tensorflow.keras import layers
normalizer = tf.keras.layers.Normalization(axis=-1)
horsepower = np.array(train_features['Horsepower'])

horsepower_normalizer = layers.Normalization(input_shape=[1,], axis=None)
horsepower_model = tf.keras.Sequential([
    horsepower_normalizer,
    layers.Dense(units=1)
])

horsepower_model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 normalization_10 (Normaliza  (None, 1)                3         
 tion)                                                           
                                                                 
 dense (Dense)               (None, 1)                 2         
                                                                 
Total params: 5
Trainable params: 2
Non-trainable params: 3
_________________________________________________________________


In [37]:
horsepower_model.compile(
    optimizer=tf.optimizers.Adam(learning_rate=0.1),
    loss=HuberLoss(2.))

# Custom Activation Functions, Initializers, Regularizers, and Constraints

Most Keras functionalities, such as losses, regularizers, constraints, initializers, met‐
rics, activation functions, layers and even full models can be customized in very much
the same way. Most of the time, you will just need to write a simple function, with the
appropriate inputs and outputs. For example, here are examples of a custom activa‐
tion function (equivalent to keras.activations.softplus or tf.nn.softplus), a
custom Glorot initializer (equivalent to keras.initializers.glorot_normal), a cus‐
tom ℓ1
 regularizer (equivalent to keras.regularizers.l1(0.01)) and a custom con‐
straint that ensures weights are all positive (equivalent to
keras.constraints.nonneg() or tf.nn.relu):

In [38]:
def my_softplus(z): 
 return tf.math.log(tf.exp(z) + 1.0)

def my_glorot_initializer(shape, dtype=tf.float32):
 stddev = tf.sqrt(2. / (shape[0] + shape[1]))
 return tf.random.normal(shape, stddev=stddev, dtype=dtype)

def my_l1_regularizer(weights):
 return tf.reduce_sum(tf.abs(0.01 * weights))

def my_positive_weights(weights): 
   return tf.where(weights < 0., tf.zeros_like(weights), weights)


In [39]:
layer = keras.layers.Dense(30, activation=my_softplus,
 kernel_initializer=my_glorot_initializer,
 kernel_regularizer=my_l1_regularizer,
 kernel_constraint=my_positive_weights)

We could save the function and doing the same as we did in the loss function by creating sub class here is an example:


In [40]:
class MyL1Regularizer(keras.regularizers.Regularizer):
 def __init__(self, factor):
     self.factor = factor
 def __call__(self, weights):
     return tf.reduce_sum(tf.abs(self.factor * weights))
 def get_config(self):
     return {"factor": self.factor}


# Custom Metrics

Losses and metrics are conceptually not the same thing: losses are used by Gradient
Descent to train a model, so they must be differentiable (at least where they are evalu‐
ated) and their gradients should not be 0 everywhere. Plus, it’s okay if they are not
easily interpretable by humans (e.g. cross-entropy). In contrast, metrics are used to
evaluate a model, they must be more easily interpretable, and they can be nondifferentiable or have 0 gradients everywhere (e.g., accuracy).

That said, in most cases, defining a custom metric function is exactly the same as
defining a custom loss function. In fact, we could even use the Huber loss function we
created earlier as a metric6
, it would work just fine (and persistence would also work
the same way, in this case only saving the name of the function, "huber_fn"):

In [None]:
model.compile(loss="mse", optimizer="nadam", metrics=[create_huber(2.0)])

When you define a metric using a simple function, Keras automatically calls it for
each batch, and it keeps track of the mean during each epoch, just like we did man‐
ually. So the only benefit of our HuberMetric class is that the threshold will be saved.


# Custom Layers
You may occasionally want to build an architecture that contains an exotic layer for
which TensorFlow does not provide a default implementation. In this case, you will
need to create a custom layer. Or sometimes you may simply want to build a very
repetitive architecture, containing identical blocks of layers repeated many times, and
it would be convenient to treat each block of layers as a single layer. For example, if
the model is a sequence of layers A, B, C, A, B, C, A, B, C, then you might want to
define a custom layer D containing layers A, B, C, and your model would then simply
be D, D, D. Let’s see how to build custom layers.
First, some layers have no weights, such as keras.layers.Flatten or keras.lay
ers.ReLU. If you want to create a custom layer without any weights, the simplest
option is to write a function and wrap it in a keras.layers.Lambda layer. For exam‐
ple, the following layer will apply the exponential function to its inputs:

you can use it as activation function as well

In [2]:
exponential_layer = keras.layers.Lambda(lambda x: tf.exp(x))


you need to create a subclass of the keras.layers.Layer class. For exam‐
ple, the following class implements a simplified version of the Dense layer:


In [3]:
class MyDense(keras.layers.Layer):
 def __init__(self, units, activation=None, **kwargs):
  super().__init__(**kwargs)
  self.units = units
  self.activation = keras.activations.get(activation)
 def build(self, batch_input_shape):
  self.kernel = self.add_weight(
  name="kernel", shape=[batch_input_shape[-1], self.units],
  initializer="glorot_normal")
  self.bias = self.add_weight(
  name="bias", shape=[self.units], initializer="zeros")
  super().build(batch_input_shape) # must be at the end
 def call(self, X):
  return self.activation(X @ self.kernel + self.bias)
 def compute_output_shape(self, batch_input_shape):
  return tf.TensorShape(batch_input_shape.as_list()[:-1] + [self.units])
def get_config(self):
  base_config = super().get_config()
  return {**base_config, "units": self.units,"activation": keras.activations.serialize(self.activation)}

If your layer needs to have a different behavior during training and during testing
(e.g., if it uses Dropout or BatchNormalization layers), then you must add a train
ing argument to the call() method and use this argument to decide what to do. For
example, let’s create a layer that adds Gaussian noise during training (for regulariza‐
tion), but does nothing during testing (Keras actually has a layer that does the same
thing: keras.layers.GaussianNoise):

In [5]:
class MyGaussianNoise(keras.layers.Layer):
 def __init__(self, stddev, **kwargs):
  super().__init__(**kwargs)
  self.stddev = stddev
 def call(self, X, training=None):
  if training:
   noise = tf.random.normal(tf.shape(X), stddev=self.stddev)
   return X + noise
  else:
    return X
 def compute_output_shape(self, batch_input_shape):
  return batch_input_shape


# Custom Models


As we mentioned in chapter 10 there is some custom model we can create and here is an one of example of it  , we would explain and do it more on chapter 14 

In [6]:
class ResidualBlock(keras.layers.Layer):
 def __init__(self, n_layers, n_neurons, **kwargs):
  super().__init__(**kwargs)
  self.hidden = [keras.layers.Dense(n_neurons, activation="elu",
  kernel_initializer="he_normal")
  for _ in range(n_layers)]
 def call(self, inputs):
  Z = inputs
  for layer in self.hidden:
   Z = layer(Z)
  return inputs + Z

In [8]:
class ResidualRegressor(keras.models.Model):
 def __init__(self, output_dim, **kwargs):
  super().__init__(**kwargs)
  self.hidden1 = keras.layers.Dense(30, activation="elu",
  kernel_initializer="he_normal")
  self.block1 = ResidualBlock(2, 30)
  self.block2 = ResidualBlock(2, 30)
  self.out = keras.layers.Dense(output_dim)
 def call(self, inputs):
  Z = self.hidden1(inputs)
  for _ in range(1 + 3):
    Z = self.block1(Z)
    Z = self.block2(Z)
  return self.out(Z)

We create the layers in the constructor, and use them in the call() method. This
model can then be used like any other model (compile it, fit it, evaluate it and use it to
make predictions). If you also want to be able to save the model using the save()
method, and load it using the keras.models.load_model() function, you must
implement the get_config() method (as we did earlier) in both the ResidualBlock
class and the ResidualRegressor class. Alternatively, you can just save and load the
weights using the save_weights() and load_weights() methods.

# Losses and Metrics Based on Model Internals

The custom losses and metrics we defined earlier were all based on the labels and the
predictions (and optionally sample weights). However, you will occasionally want to
define losses based on other parts of your model, such as the weights or activations of
its hidden layers. This may be useful for regularization purposes, or to monitor some
internal aspect of your model

# Custom Training Loops

In some rare cases, the fit() method may not be flexible enough for what you need
to do. For example, the Wide and Deep paper we discussed in Chapter 10 actually
uses two different optimizers: one for the wide path and the other for the deep path.

Since the fit() method only uses one optimizer (the one that we specify when compiling the model), implementing this paper requires writing your own custom
loop.

You may also like to write your own custom training loops simply to feel more confident that it does precisely what you intent it to do (perhaps you are unsure about
some details of the fit() method). It can sometimes feel safer to make everything
explicit. However, remember that writing a custom training loop will make your code
longer, more error prone and harder to maintain.


In [11]:
# create a tiny function that will randomly sample a batch of instances from the training set
def random_batch(X, y, batch_size=32):
 idx = np.random.randint(len(X), size=batch_size)
 return X[idx], y[idx]

''''
Let’s also define a function that will display the training status, including the number
of steps, the total number of steps, the mean loss since the start of the epoch (i.e., we
will use the Mean metric to compute it), and other metrics
'''''
def print_status_bar(iteration, total, loss, metrics=None):
 metrics = " - ".join(["{}: {:.4f}".format(m.name, m.result())
 for m in [loss] + (metrics or [])])
 end = "" if iteration < total else "\n"
 print("\r{}/{} - ".format(iteration, total) + metrics,
 end=end)

In [None]:
# fit implementation 
for epoch in range(1, n_epochs + 1):
 print("Epoch {}/{}".format(epoch, n_epochs))
 for step in range(1, n_steps + 1):
  X_batch, y_batch = random_batch(X_train_scaled, y_train)
  with tf.GradientTape() as tape:
     y_pred = model(X_batch, training=True)
     main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
     loss = tf.add_n([main_loss] + model.losses)
  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))
  mean_loss(loss)
  for metric in metrics:
   metric(y_batch, y_pred)
   print_status_bar(step * batch_size, len(y_train), mean_loss, metrics)
   print_status_bar(len(y_train), len(y_train), mean_loss, metrics)
  for metric in [mean_loss] + metrics:
    metric.reset_states()

Moreover, when you write a custom loss function, a custom metric, a custom layer or
any other custom function, and you use it in a Keras model (as we did throughout
this chapter), Keras automatically converts your function into a TF Function, no need
to use tf.function(). So most of the time, all this magic is 100% transparent.

TF Function generates a new graph for every unique set of input shapes and data
types, and it caches it for subsequent calls. For example, if you call tf_cube(tf.con
stant(10)), a graph will be generated for int32 tensors of shape []. Then if you call
tf_cube(tf.constant(20)), the same graph will be reused. But if you then call
tf_cube(tf.constant([10, 20])), a new graph will be generated for int32 tensors
of shape [2]. This is how TF Functions handle polymorphism (i.e., varying argument
types and shapes). However, this is only true for tensor arguments: if you pass numer‐
ical Python values to a TF Function, a new graph will be generated for every distinct
value: for example, calling tf_cube(10) and tf_cube(20) will generate two graphs.

If you call a TF Function many times with different numerical
Python values, then many graphs will be generated, slowing down
your program and using up a lot of RAM. Python values should be
reserved for arguments that will have few unique values, such as
hyperparameters like the number of neurons per layer. This allows
TensorFlow to better optimize each variant of your model.
