In [1]:
import tensorflow as tf

# Using TensorFlow like Numpy
## Tensors and Operations

In [2]:
tf.constant([[1.,2.,3.], [4., 5., 6.]])

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)>

In [3]:
tf.constant(42)

<tf.Tensor: shape=(), dtype=int32, numpy=42>

In [4]:
t = tf.constant([[1., 2., 3.], [4., 5., 6.]])
t.shape

TensorShape([2, 3])

In [5]:
t.dtype

tf.float32

In [6]:
t[:, 1:]

<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[2., 3.],
       [5., 6.]], dtype=float32)>

In [7]:
# t[..., 1]
t[..., 1, tf.newaxis]

<tf.Tensor: shape=(2, 1), dtype=float32, numpy=
array([[2.],
       [5.]], dtype=float32)>

In [8]:
t + 10

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[11., 12., 13.],
       [14., 15., 16.]], dtype=float32)>

In [9]:
tf.square(t)

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[ 1.,  4.,  9.],
       [16., 25., 36.]], dtype=float32)>

In [10]:
t @ tf.transpose(t)

<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[14., 32.],
       [32., 77.]], dtype=float32)>

### Keras' Low-Level API

In [11]:
from tensorflow import keras
K = keras.backend
K.square(K.transpose(t)) + 10

<tf.Tensor: shape=(3, 2), dtype=float32, numpy=
array([[11., 26.],
       [14., 35.],
       [19., 46.]], dtype=float32)>

## Tensors and Numpy

In [12]:
import numpy as np

In [13]:
a = np.array([2., 4., 5.])
tf.constant(a)

<tf.Tensor: shape=(3,), dtype=float64, numpy=array([2., 4., 5.])>

In [14]:
t.numpy()

array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)

In [15]:
np.array(t)

array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)

In [16]:
tf.square(a)

<tf.Tensor: shape=(3,), dtype=float64, numpy=array([ 4., 16., 25.])>

In [17]:
np.square(t)

array([[ 1.,  4.,  9.],
       [16., 25., 36.]], dtype=float32)

## Type Conversions

In [18]:
# tf.constant(2.) + tf.constant(40) # Error

In [19]:
# tf.constant(2.) + tf.constant(40, dtype=tf.float64) # Error

In [20]:
t2 = tf.constant(40, dtype=tf.float64)

In [21]:
tf.constant(2.0) + tf.cast(t2, tf.float32)

<tf.Tensor: shape=(), dtype=float32, numpy=42.0>

## Variables

In [22]:
v= tf.Variable([[1., 2., 3.], [4., 5., 6.]])
v

<tf.Variable 'Variable:0' shape=(2, 3) dtype=float32, numpy=
array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)>

In [23]:
v.assign( 2 * v)

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2.,  4.,  6.],
       [ 8., 10., 12.]], dtype=float32)>

In [24]:
v[0, 1].assign(42)

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2., 42.,  6.],
       [ 8., 10., 12.]], dtype=float32)>

In [25]:
v[:, 2].assign([0., 1.])

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2., 42.,  0.],
       [ 8., 10.,  1.]], dtype=float32)>

In [26]:
v.scatter_nd_update(indices=[[0, 0], [1, 2]], updates=[100., 200.])

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[100.,  42.,   0.],
       [  8.,  10., 200.]], dtype=float32)>

# Customizing Models and Training Algorithms
## Custom Loss Functions

In [27]:
fashion_mnist = keras.datasets.fashion_mnist
(X_train_full, y_train_full), (X_test, y_test) = fashion_mnist.load_data()
#
print(X_train_full.shape)
print(X_train_full.dtype)

(60000, 28, 28)
uint8


In [28]:
X_valid, X_train = X_train_full[:5000] / 255.0, X_train_full[5000:] / 255.0
y_valid, y_train = y_train_full[:5000], y_train_full[5000:]
X_test = X_test / 255.0

In [29]:
class_names = ["T-shirt/top", "Trouser", "Pullover", "Dress", "Coat", 
              "Sandal", "Shirt", "Sneaker", "Bag", "Ankle boot"]

In [30]:
# Build the model
model = keras.models.Sequential()
model.add(keras.layers.Flatten(input_shape=[28, 28]))
for n_hidden in (300, 100):
    model.add(keras.layers.Dense(n_hidden, activation="relu"))
model.add(keras.layers.Dense(10, activation="softmax"))

In [31]:
def huber_fn(y_true, y_pred):
    error = tf.cast(y_true, tf.float32) - y_pred
    is_small_error = tf.abs(error) < 1
    squared_loss = tf.square(error) / 2
    linear_loss = tf.abs(error) - 0.5
    return tf.where(is_small_error, squared_loss, linear_loss)

In [32]:
# model.compile(loss="mse", optimizer="nadam")
model.compile(loss=huber_fn, optimizer="nadam")
model.fit(X_train, y_train, epochs=1, validation_data=(X_valid, y_valid))



<tensorflow.python.keras.callbacks.History at 0x7fcf5c238a20>

## Saving and Loading Models That Contain Custom Components

In [33]:
model.save("my_model_with_a_custom_loss.h5")

In [34]:
model = keras.models.load_model("my_model_with_a_custom_loss.h5",
                               custom_objects={"huber_fn": huber_fn})

In [35]:
def create_huber(threshold=1.0):
    def huber_fn(y_true, y_pred):
        error = tf.cast(y_true, tf.float32) - y_pred
        is_snall_error = tf.abs(error) < threshold
        squared_loss = tf.square(error) / 2
        linear_loss = threshold * tf.abs(error) - (threshold**2/2)
        return tf.where(is_small_error, squared_loss, linear_loss)
    return huber_fn

In [36]:
model.compile(loss=create_huber(2.0), optimizer="nadam")
model.save("my_model_with_a_custom_loss_threshold_2.h5")

In [37]:
model = keras.models.load_model("my_model_with_a_custom_loss_threshold_2.h5",
                custom_objects={"huber_fn": create_huber(2.0)})

In [38]:
class HuberLoss(keras.losses.Loss):
    def __init__(self, threshold=1.0, **kwargs):
        self.threshold = threshold
        super().__init__(**kwargs)
        
    def call(self, y_true, y_pred):
        error = tf.cast(y_true, tf.float32) - y_pred
        is_small_error = tf.abs(error) < self.threshold
        squared_loss = tf.square(error)
        linear_loss = self.threshold * tf.abs(error) - (self.threshold**2 / 2)
        return tf.where(is_small_error, squared_error, linear_error)
    
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}

In [39]:
model.compile(loss=HuberLoss(2.0), optimizer="nadam")

In [40]:
model.save("my_model_with_a_custom_loss_class.h5")

In [41]:
model = keras.models.load_model("my_model_with_a_custom_loss_class.h5",
                               custom_objects={"HuberLoss": HuberLoss})

## Custome Activation Functions, Initializers, Regularizers, and Constraints

In [42]:
def my_softplus(z): # return value is just tf.nn.softplus(z)
    return tf.math.log(tf.exp(z) + 1.0)

In [43]:
def my_glorot_initializer(shape, dtype=tf.float32):
    stddev = tf.sqrt(2.0 / (shape[0] + shape[1]))
    return tf.random.normal(shape, stddev=stddev, dtype=dtype)

In [44]:
def my_l1_regularizer(weights):
    return tf.reduce_sum(tf.abs(0.01 * weights))

In [45]:
def my_positive_weights(weights): # return value is just tf.nn.relu(weights)
    return tf.where(weights < 0.0, tf.zeros_like(weights), weights)

In [46]:
layer = keras.layers.Dense(30, activation=my_softplus,
                          kernel_initializer=my_glorot_initializer,
                          kernel_regularizer=my_l1_regularizer,
                          kernel_constraint=my_positive_weights)

In [47]:
class MyL1Regularizer(keras.regularizers.Regularizer):
    def __init__(self, factor):
        self.factor = factor
    def __call__(self, weights):
        return tf.reduce_sum(tf.abs(self.factor * weights))
    def get_config(self):
        return {"factor": self.factor}

## Custom Metrics

In [48]:
model.compile(loss="mse", optimizer="nadam", metrics=[create_huber(2.0)])

In [49]:
precision = keras.metrics.Precision()
precision([0,1,1,1,0,1,0,1], [1,1,0,1,0,1,0,1])

<tf.Tensor: shape=(), dtype=float32, numpy=0.8>

In [50]:
precision([0,1,0,0,1,0,1,1], [1,0,1,1,0,0,0,0])

<tf.Tensor: shape=(), dtype=float32, numpy=0.5>

In [51]:
precision.result()

<tf.Tensor: shape=(), dtype=float32, numpy=0.5>

In [52]:
precision.variables

[<tf.Variable 'true_positives:0' shape=(1,) dtype=float32, numpy=array([4.], dtype=float32)>,
 <tf.Variable 'false_positives:0' shape=(1,) dtype=float32, numpy=array([4.], dtype=float32)>]

In [53]:
precision.reset_states() # both variables get reset to 0.0
precision.variables

[<tf.Variable 'true_positives:0' shape=(1,) dtype=float32, numpy=array([0.], dtype=float32)>,
 <tf.Variable 'false_positives:0' shape=(1,) dtype=float32, numpy=array([0.], dtype=float32)>]

In [54]:
# Streaming metric example, by subclassing
class HuberMetric(keras.metrics.Metric):
    def __init__(self, threshold=1.0, **kwargs):
        super().__init__(**kwargs) # handles base args (e.g. dtype)
        self.threshold = threshold
        self.huber_hn = create_huber(threshold)
        self.total = self.add_weight("total", initializer="zeros")
        self.count = self.add_weight("count", initializer="zeros")
    def update_state(self, y_true, y_pred, sample_weight=None):
        metric = self.huber_fn(y_true, y_pred)
        self.total.assign_add(tf.reduce_sum(metric))
        self.count.assign_add(tf.cast(tf.size(y_true), tf.float32))
    def result(self):
        return self.total / self.count
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}

## Custom Layers

In [55]:
exponential_layer = keras.layers.Lambda(lambda x: tf.exp(x) )

In [56]:
class MyDense(keras.layers.Layer):
    def __init__(self, units, activation=None, **kwargs):
        super().__init__(**kwargs)
        self.units = units
        self.activation = keras.activations.get(activation)
    
    def build(self, batch_input_shape):
        self.kernel = self.add_weight(
            name="kernel", shape=[batch_input_shape[-1], self.units],
            initializer="glorot_normal")
        self.bias = self.add_weight(
            name="bias", shape=[self.units], initializer="zeros")
        super().build(batch_input_shape) # must be at the end
    
    def call(self, X):
        return self.activation( X @ self.kernel + self.bias)
    
    def compute_output_shape(self, batch_input_shape):
        return tf.TensorShape(batch_input_shape.as_list()[:-1] + [self.units])
    
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "units": self.units, "activation": keras.activations.serialize(self.activation)}

In [57]:
# Multiple input, multiple output
class MyMultiLayer(keras.layers.Layer):
    def call(self, X):
        X1, X2 = X
        return [X1 + X2, X1 * X2, X1 / X2]
    def compute_output_shape(self, batch_input_shape):
        b1, b2 = batch_input_shape
        return [b1, b1, b1] # should probably handle broadcasting rules

In [58]:
# Different behavior in training and in testing
class MyGaussianNoise(keras.layers.Layer):
    def __init__(self, stddev, **kwargs):
        supper().__init__(**kwargs)
        self.stddev = stddev
    
    def call(self, X, training=None):
        if training:
            noise = tf.random.normal(tf.shape(X), stddev=self.stddev)
            return X + noise
        else:
            return X
        
    def compute_output_shape(self, batch_input_shape):
        return batch_input_shape

## Custom Models

In [59]:
class ResidualBlock(keras.layers.Layer):
    def __init__(self, n_layers, n_neurons, **kwargs):
        super().__init__(**kwargs)
        self.hidden = [keras.layers.Dense(n_neurons, activation="elu",
                                         kernel_initializer="he_normal")
                      for _ in range(n_layers)]
    
    def call(self, inputs):
        Z = inputs
        for layer in self.hidden:
            Z = layer(Z)
        return inputs + Z

In [60]:
class ResidualRegressor(keras.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.hidden1 = keras.layers.Dense(30, activation="elu",
                                         kernel_initializer="he_normal")
        self.block1 = ResidualBlock(2, 30)
        self.block2 = ResiduleBlock(2, 30)
        self.out = keras.Dense(output_dim)
        
    def call(self, inputs):
        Z = self.hidden1(inputs)
        for _ in range(1 + 3):
            Z = self.block1(Z)
        Z = self.block2(Z)
        return self.out(Z)

## Loses and Metrics Based on Model Internels

In [62]:
class ReconstructionRegressor(keras.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.hidden = [keras.layers.Dense(30, activation="selu",
                                         kernel_initializer="lecun_normal")
                      for  _ in range(5)]
        self.out = keras.layers.Dense(output_dim)
        
    def build(self, batch_input_shape):
        n_inputs = batch_input_shape[-1]
        self.reconstruct = keras.layers.Dense(n_inputs)
        super().build(batch_input_shape)
        
    def call(self, inputs):
        Z = inputs
        for layer in self.hidden:
            Z = layer(Z)
        reconstruction = self.reconstruct(Z)
        recon_loss = tf.reduce_mean(tf.square(reconstruction - inputs))
        self.add_loss(0.05 * recon_loss)
        return self.out(Z)

## Computing Gradients Using Autodiff

In [65]:
def f(w1, w2):
    return 3 * w1**2 + 2 * w1 * w2

In [67]:
w1, w2 = 5, 3
eps = 1e-6

In [68]:
(f(w1 + eps, w2) - f(w1, w2)) / eps

36.000003007075065

In [69]:
(f(w1, w2 + eps) - f(w1, w2)) / eps

10.000000003174137

In [71]:
# autodiff
w1, w2 = tf.Variable(5.0), tf.Variable(3.0)
with tf.GradientTape() as tape:
    z = f(w1, w2)
    
gradients = tape.gradient(z, [w1, w2])

In [73]:
gradients

[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=10.0>]

In [75]:
# gradients = tape.gradient(z, [w1, w2]) # Error at calling it twice

In [78]:
with tf.GradientTape(persistent=True) as tape:
    z = f(w1, w2)
    
dz_dw1 = tape.gradient(z, w1) # => tensor 36.0
dz_dw2 = tape.gradient(z, w2) # => tensor 10.0, works fine now!
del tape

In [82]:
c1, c2 = tf.constant(5.0), tf.constant(3.0)
with tf.GradientTape() as tape:
    z = f(c1, c2)
    
gradients = tape.gradient(z, [c1,c2]) # return [None, None]

In [83]:
gradients

[None, None]

In [85]:
with tf.GradientTape() as tape:
    tape.watch(c1)
    tape.watch(c2)
    z = f(c1, c2)
    
gradients = tape.gradient(z, [c1, c2]) # returns [tensor 36.0, tensor 10.0]

In [86]:
gradients

[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=10.0>]

In [87]:
def f(w1, w2):
    return 3 * w1**2 + tf.stop_gradient(2*w1*w2)

with tf.GradientTape() as tape:
    z = f(w1, w2) # same result as without stop_gradient()
    
gradients = tape.gradient(z, [w1, w2]) # => returns [tensor 30, None]

In [88]:
gradients

[<tf.Tensor: shape=(), dtype=float32, numpy=30.0>, None]

In [89]:
x = tf.Variable([100.0])
with tf.GradientTape() as tape:
    z = my_softplus(x)
    
tape.gradient(z, [x])

[<tf.Tensor: shape=(1,), dtype=float32, numpy=array([nan], dtype=float32)>]

In [90]:
# Custom gradient function
@tf.custom_gradient
def my_better_softplus(z):
    exp = tf.exp(z)
    def my_softplus_gradients(grad):
        return grad / (1. + 1. / exp)
    return tf.math.log(exp + 1), my_softplus_gradients

## Custom Training Loops

In [92]:
l2_reg = keras.regularizers.l2(0.05)
model = keras.models.Sequential([
    keras.layers.Dense(30, activation="elu", kernel_initializer="he_normal",
                      kernel_regularizer=l2_reg),
    keras.layers.Dense(1, kernel_regularizer=l2_reg)
])

In [93]:
def random_batch(X, y, batch_size=32):
    idx = np.random.randint(len(X), size=batch_size)
    return X[idx], y[idx]

In [94]:
def print_status_bar(iteration, total, loss, metrics=None):
    metrics = " - ".join(["{}: {:.4f}".format(m.name, m.result())
                         for m in [loss] + (metrics or [])])
    end = "" if iteration < total else "\n"
    print("\r{}/{} - ".format(iteration, total) + metrics, end=end)

In [96]:
n_epochs = 5
batch_size = 32
n_steps = len(X_train) // batch_size
optimizer = keras.optimizers.Nadam(lr=0.01)
loss_fn = keras.losses.mean_squared_error
mean_loss = keras.metrics.Mean()
metrics = [keras.metrics.MeanAbsoluteError()]

In [104]:
# Custom training loop
for epoch in range(1, n_epochs + 1):
    print("Epoch {}/{}".format(epoch, n_epochs))
    for step in range(1, n_steps+1):
        X_batch, y_batch = random_batch(X_train, y_train)
        with tf.GradientTape() as tape:
            y_pred = model(X_batch, training=True)
            main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
            loss = tf.add_n([main_loss] + model.losses)
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))
        mean_loss(loss)
        for metric in metrics:
            metric(y_batch, y_pred)
        print_status_bar(step * batch_size, len(y_train), mean_loss, metrics)
    print_status_bar(len(y_train), len(y_train), mean_loss, metrics)
    for metric in [mean_loss] + metrics:
        metric.reset_states()

Epoch 1/5
55000/55000 - mean: 8.4472 - mean_absolute_error: 2.5054
Epoch 2/5
55000/55000 - mean: 8.3269 - mean_absolute_error: 2.5009
Epoch 3/5
55000/55000 - mean: 8.3252 - mean_absolute_error: 2.5026
Epoch 4/5
55000/55000 - mean: 8.3057 - mean_absolute_error: 2.5027
Epoch 5/5
55000/55000 - mean: 8.2800 - mean_absolute_error: 2.4980


In [107]:
# Apply weight constraints, after apply_gradients
for variable in model.variables:
    if variable.constraint is not None:
        variable.assign(variable.constraint(variable))

# TensorFlow Functions and Graphs

In [109]:
def cube(x):
    return x ** 3

In [116]:
cube(2)

8

In [117]:
cube(tf.constant(2.0))

<tf.Tensor: shape=(), dtype=float32, numpy=8.0>

In [118]:
tf_cube = tf.function(cube)
tf_cube

<tensorflow.python.eager.def_function.Function at 0x7fcf700d0be0>

In [119]:
tf_cube(2)

<tf.Tensor: shape=(), dtype=int32, numpy=8>

In [120]:
tf_cube(tf.constant(2.0))

<tf.Tensor: shape=(), dtype=float32, numpy=8.0>

In [121]:
@tf.function
def tf_cube(x):
    return x ** 3

In [122]:
tf_cube.python_function(2)

8

In [125]:
print( tf.autograph.to_code(tf_cube.python_function) )

def tf__tf_cube(x):
    with ag__.FunctionScope('tf_cube', 'fscope', ag__.ConversionOptions(recursive=True, user_requested=True, optional_features=(), internal_convert_user_code=True)) as fscope:
        do_return = False
        retval_ = ag__.UndefinedReturnValue()
        try:
            do_return = True
            retval_ = (ag__.ld(x) ** 3)
        except:
            do_return = False
            raise
        return fscope.ret(retval_, do_return)

