In [1]:
import tensorflow as tf

In [2]:
# constants are immutable


# matrix

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)>

In [3]:
tf.constant(42)
# scalar

<tf.Tensor: shape=(), dtype=int32, numpy=42>

In [4]:
t = tf.constant([[1., 2., 3.], [4., 5., 6.]])
t.shape

TensorShape([2, 3])

In [6]:
t[..., 1, tf.newaxis]

<tf.Tensor: shape=(2, 1), dtype=float32, numpy=
array([[2.],
       [5.]], dtype=float32)>

In [7]:
t + 10

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[11., 12., 13.],
       [14., 15., 16.]], dtype=float32)>

In [8]:
tf.sqrt(t)

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[1.       , 1.4142135, 1.7320508],
       [2.       , 2.236068 , 2.4494898]], dtype=float32)>

In [9]:
tf.transpose(t)

<tf.Tensor: shape=(3, 2), dtype=float32, numpy=
array([[1., 4.],
       [2., 5.],
       [3., 6.]], dtype=float32)>

In [10]:
tf.transpose(t) @ t

# @ stands for matrix multiplication

<tf.Tensor: shape=(3, 3), dtype=float32, numpy=
array([[17., 22., 27.],
       [22., 29., 36.],
       [27., 36., 45.]], dtype=float32)>

In [13]:
# tensorflow doesn't change dtypes automatically
try:
    tf.constant(2.) + tf.constant(2)
except Exception as e:
    print(e)

cannot compute AddV2 as input #1(zero-based) was expected to be a float tensor but is a int32 tensor [Op:AddV2]


In [14]:
tf.constant(2.) + tf.cast(tf.constant(2), dtype=tf.float32)

<tf.Tensor: shape=(), dtype=float32, numpy=4.0>

In [16]:
# mutable data
v = tf.Variable([[1., 2., 3.], [4., 5., 6.]])
v

<tf.Variable 'Variable:0' shape=(2, 3) dtype=float32, numpy=
array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)>

In [17]:
v.assign(2 * v)

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2.,  4.,  6.],
       [ 8., 10., 12.]], dtype=float32)>

In [19]:
v[0, 1].assign(42)

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2., 42.,  6.],
       [ 8., 10., 12.]], dtype=float32)>

In [20]:
v[:, 2].assign([0, 1])

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2., 42.,  0.],
       [ 8., 10.,  1.]], dtype=float32)>

In [21]:
# like multiple assign
v.scatter_nd_update(indices=[[0, 0], [1, 2]], updates=[100., 200.])

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[100.,  42.,   0.],
       [  8.,  10., 200.]], dtype=float32)>

# custom loss function

In [22]:
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

housing = fetch_california_housing()
X_train_f, X_test, y_train_f, y_test = train_test_split(housing.data, housing.target.reshape(-1, 1))
X_train, X_val, y_train, y_val = train_test_split(X_train_f, y_train_f)

scaler = StandardScaler()
X_train_s = scaler.fit_transform(X_train)
X_val_s = scaler.transform(X_val)
X_test_s = scaler.transform(X_test)

In [23]:
def create_huber(threshold=1.0):
    def huber(y_true, y_pred):
        error = y_true - y_pred
        is_small = tf.abs(error) < threshold
        squared_loss = tf.square(error) / 2
        linear_loss = threshold * tf.abs(error) - threshold ** 2 / 2
        return tf.where(is_small, squared_loss, linear_loss)
    return huber

In [26]:
from tensorflow import keras
input_shape = X_train.shape[1:]

model = keras.models.Sequential([
    keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal", input_shape=input_shape),
    keras.layers.Dense(1)
])

model.compile(loss=create_huber(2.0), optimizer="nadam", metrics=["mae"])

In [27]:
model.fit(X_train_s, y_train, epochs=2, validation_data=[X_val_s, y_val], batch_size=64)

Epoch 1/2
Epoch 2/2


<keras.callbacks.History at 0x17c3cfcd0>

In [28]:
model.save("custom_loss.h5")

In [29]:
model = keras.models.load_model("custom_loss.h5", custom_objects={"create_huber": create_huber(2.)})

In [30]:
model.fit(X_train_s, y_train, epochs=2, validation_data=[X_val_s, y_val], batch_size=64)

Epoch 1/2
Epoch 2/2


<keras.callbacks.History at 0x17ced4670>

In [31]:
class HuberLoss(keras.losses.Loss):
    def __init__(self, threshold=1.0, **kwargs):
        self.threshold = threshold
        super().__init__(**kwargs)
    def call(self, y_true, y_pred):
        error = y_true - y_pred
        is_small = tf.abs(error) < self.threshold
        squared_loss = tf.square(error) / 2
        linear_loss = self.threshold * tf.abs(error) - self.threshold ** 2 / 2
        return tf.where(is_small, squared_loss, linear_loss)
    def get_config(self):
        base_config = super().get_config()
        return {
            **base_config,
            "threshold": self.threshold,
        }

In [32]:
model.compile(loss=HuberLoss(2.), optimizer="nadam", metrics=["mae"])

In [33]:
model.fit(X_train_s, y_train, epochs=2, validation_data=[X_val_s, y_val], batch_size=64)

Epoch 1/2
Epoch 2/2


<keras.callbacks.History at 0x17d0ae170>

In [34]:
model.loss.threshold

2.0

In [None]:
# other custom parameters

# if we want it to be saved implement as classes

# custom activation function 
def my_softplus(z):
    return tf.math.log(tf.exp(z) + 1.0)

# custom weight initializer
# keras.initializers.Initializer
def my_glorot_initializer(shape, dtype=tf.float32):
    stddev = tf.sqrt(2. / (shape[0] + shape[1]))
    return tf.random.normal(shape, stddev=stddev, dtype=dtype)

# custom regularizer
# keras.regularizers.Regularizer
def my_l1_regulizer(weights):
    return tf.reduce_sum(tf.abs(0.01 * weights))

# custom constraint
# keras.constraints.Constraint
def my_positive_weights(weights):
    return tf.where(weights < 0., tf.zeros_like(weights), weights)

# keras.layers.Layer
layer = keras.layers.Dense(30, activation=my_softplus, kernel_initializer=my_glorot_initializer, kernel_regularizer=my_l1_regulizer, kernel_constraint=my_positive_weights)

In [35]:
# class example
class MyL1Regularizer(keras.regularizers.Regularizer):
    def __init__(self, factor):
        self.factor = factor
    def __call__(self, weights):
        return tf.reduce_sum(tf.abs(self.factor * weights))
    def get_config(self):
        return {"factor": self.factor}

In [None]:
model.compile(loss="mse", optimizer="nadam", metrics=[create_huber(2.0)])

In [None]:
# custom Metric - stemming metric implementation
class HuberMetric(keras.metrics.Metric):
    def __init__(self, threshold=1.0, **kwargs):
        super().__init__(**kwargs)
        self.threshold = threshold
        self.huber_fn = create_huber(threshold)
        self.total = self.add_weight("total", initializer="zeros")
        self.count = self.add_weight("count", initializer="zeros")
    def update_state(self, y_true, y_pred, sample_weight=None):
        metric = self.huber_fn(y_true, y_pred)
        self.total.assign_add(tf.reduce_sum(metric))
        self.count.assign_add(tf.cast(tf.size(y_true), tf.float32))
    def result(self):
        return self.total / self.count
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}


In [None]:
# layer that does not have weights
exponential_layer = keras.layers.Lambda(lambda x: tf.exp(x))

In [None]:
class MyDense(keras.layers.Layer):
    def __init__(self, units, activation=None, **kwargs):
        super().__init__(**kwargs)
        self.units = units
        self.activation = activation
    def build(self, batch_input_shape):
        self.kernel = self.add_weight(
            name="kernel",
            shape=[batch_input_shape[-1], self.units],
            initializer="glorot_normal",
        )
        self.bias = self.add_weight(
            name="bias",
            shape=[self.units],
            initializer="zeros",
        )
        super().build(batch_input_shape)
    def call(self, X):
        return self.activation(X @ self.kernel + self.bias)
    def compute_output_shape(self, batch_input_shape):
        return tf.TensorShape(batch_input_shape.as_list()[:-1] + [self.units])
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "units": self.units, "activation": keras.activations.serialize(self.activation)}


In [None]:
# 2 inputs, 3 outputs
class MyMultiLayer(keras.layers.Layer):
    def call(self, X):
        X1, X2 = X
        return [X1 + X2,  X1 * X2, X1 / X2]
    def compute_output_shape(self, batch_input_shape):
        b1, b2 = batch_input_shape
        return [b1, b1, b1]

In [36]:
class MyGaussianNoise(keras.layers.Layer):
    def __init__(self, stddev, **kwargs):
        super().__init__(**kwargs)
        self.stddev = stddev
    def call(self, X, training=None):
        if training:
            noise = tf.random.normal(tf.shape(X), stddev=self.stddev)
            return X + noise
        return X
    def compute_output_shape(self, batch_input_shape):
        return batch_input_shape

In [38]:
class ResidualBlock(keras.layers.Layer):
    def __init__(self, n_layers, n_neurons, **kwargs):
        super().__init__(**kwargs)
        self.hidden = [
            keras.layers.Dense(n_neurons, activation="elu", kernel_initializer="he_normal")
            for _ in range(n_layers)
        ]
    def call(self, inputs):
        Z = inputs
        for layer in self.hidden:
            Z = layer(Z)
        return inputs + Z

In [37]:
class ResidualRegressor(keras.models.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.hidden1 = keras.layers.Dense(30, activation="elu", kernel_initializer="he_normal")
        self.block1 = ResidualBlock(2, 30)
        self.block2 = ResidualBlock(2, 30)
        self.out = keras.layers.Dense(output_dim)
    def call(self, inputs):
        Z = self.hidden1(inputs)
        for _ in range(1 + 3):
            Z = self.block1(Z)
        Z = self.block2(Z)
        return self.out(Z)

# to save it, all layers and the model must have omplemented get_config() and model must have implemented save_weights() and load_weights()

In [39]:
model = ResidualRegressor(1)
model.compile(loss="mse", optimizer="nadam")
history = model.fit(X_train_s, y_train, epochs=5)
y_pred = model.predict(X_test_s)


Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


In [41]:
class ReconstructingRegressor(keras.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.hidden = [
            keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal")
            for _ in range(5)
        ]
        self.out = keras.layers.Dense(output_dim)
    def build(self, batch_input_shape):
        n_inputs = batch_input_shape[-1]
        self.reconstruct = keras.layers.Dense(n_inputs)
        super().build(batch_input_shape)
    def call(self, inputs):
        for layer in self.hidden:
        Z = inputs
            Z = layer(Z)
        reconstruction = self.reconstruct(Z)
        recon_loss = tf.reduce_mean(tf.square(reconstruction - inputs))
        self.add_loss(0.05 * recon_loss)
        return self.out(Z)

In [42]:
def f(w1, w2):
    return 3 * w1 ** 2 + 2 * w1 * w2

In [43]:
w1, w2 = 5, 3
eps = 1e-6

# partial for w1
(f(w1 + eps, w2) - f(w1, w2)) / eps

36.000003007075065

In [44]:
# partial for w2 
(f(w1, w2 + eps) - f(w1, w2)) / eps

10.000000003174137

In [46]:
# its slow for hudge numer of ws,
# use autodiff instead 

w1, w2 = tf.Variable(5.), tf.Variable(3.)
with tf.GradientTape() as tape:
    z = f(w1, w2)
gradients = tape.gradient(z, [w1, w2])
gradients

[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=10.0>]

In [49]:
try:
    tape.gradient(z, w1)
except RuntimeError as e:
    print(e)

A non-persistent GradientTape can only be used to compute one set of gradients (or jacobians)


In [50]:
# to avoid
with tf.GradientTape(persistent=True) as tape:
    z = f(w1, w2)

dz_dw1 = tape.gradient(z, w1)
dz_dw2 = tape.gradient(z, w2)

# remember to delete it
del tape

In [51]:
# by default tape watch only variables, it can be overwrited
c1, c2 = tf.constant(5.), tf.constant(3.)
with tf.GradientTape() as tape:
    tape.watch(c1)
    tape.watch(c2)
    z = f(c1, c2)

gradients = tape.gradient(z, [c1, c2])
gradients

[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=10.0>]

In [52]:
with tf.GradientTape() as tape:
    z1 = f(w1, w2 + 2.)
    z2 = f(w1, w2 + 5.)
    z3 = f(w1, w2 + 7.)

tape.gradient([z1, z2, z3], [w1, w2])

[<tf.Tensor: shape=(), dtype=float32, numpy=136.0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=30.0>]

In [None]:
"""
3 * w1 ** 2 + 2 * w1 * w2

f'(w1) --> 6 * w1 + 2 * w2
f'(w2) --> 2 * w1

w1, w2 --> f'(w1), f'(w2)
5, 5 --> 40, 10
5, 8 --> 46, 10
5, 10 --> 50, 10

sum(f'(w1)) = 40 + 46 + 50 = 136
sum(f'(w2)) = 10 + 10 + 10 = 30
"""

In [55]:
with tf.GradientTape(persistent=True) as tape:
    z1 = f(w1, w2 + 2.)
    z2 = f(w1, w2 + 5.)
    z3 = f(w1, w2 + 7.)

# same as above
tf.reduce_sum(tf.stack([tape.gradient(z, [w1, w2]) for z in (z1, z2, z3)]), axis=0)

<tf.Tensor: shape=(2,), dtype=float32, numpy=array([136.,  30.], dtype=float32)>

In [56]:
def f(w1, w2):
    return 3 * w1 ** 2 + tf.stop_gradient(2 * w1 * w2)

with tf.GradientTape() as tape:
    z = f(w1, w2)

tape.gradient(z, [w1, w2])

[<tf.Tensor: shape=(), dtype=float32, numpy=30.0>, None]

In [None]:
"""
f'(w1) = 6*w1
f'(w2) = 0
"""

In [63]:
l2_reg = keras.regularizers.l2(0.05)
model = keras.models.Sequential([
    keras.layers.Dense(30, activation="elu", kernel_initializer="he_normal", kernel_regularizer=l2_reg),
    keras.layers.Dense(1, kernel_regularizer=l2_reg),
])

In [64]:
import numpy as np

def random_batch(X, y, batch_size=64):
    idx = np.random.randint(len(X), size=batch_size)
    return X[idx], y[idx]

In [74]:
def print_status_bar(iter, total, loss, metrics=None):
    metrics = " - ".join(["{}: {:.4f}".format(m.name, m.result()) for m in [loss] + (metrics or [])])
    end = "" if iter < total else "\n"
    print("\r{}/{} - ".format(iter, total) + metrics, end=end)

In [66]:
n_epochs = 5
batch_size = 64
n_steps = len(X_train) // batch_size
optimizer = keras.optimizers.Nadam(learning_rate=0.01)
loss_fn = keras.losses.mean_squared_error
mean_loss = keras.metrics.Mean()
metrics = [keras.metrics.MeanAbsoluteError()]

In [75]:
# custom training_loop
for epoch in range(1, n_epochs + 1):
    print(f"Epoch {epoch}/{n_epochs}")
    for step in range(1, n_steps + 1):
        X_batch, y_batch = random_batch(X_train_s, y_train)
        with tf.GradientTape() as tape:
            y_pred = model(X_batch, training=True)
            main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
            loss = tf.add_n([main_loss] + model.losses)
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))
        for variable in model.variables:
            if variable.constraint is not None:
                variable.assign(variable.constraint(variable))
        mean_loss(loss)
        for metric in metrics:
            metric(y_batch, y_pred)
        print_status_bar(step * batch_size, len(y_train), mean_loss, metrics)
    print_status_bar(len(y_train), len(y_train), mean_loss, metrics)
    for metric in [mean_loss] + metrics:
        metric.reset_states()

Epoch 1/5
11610/11610 - mean: 0.6580 - mean_absolute_error: 0.5373
Epoch 2/5
11610/11610 - mean: 0.6399 - mean_absolute_error: 0.5327
Epoch 3/5
11610/11610 - mean: 0.6601 - mean_absolute_error: 0.5370
Epoch 4/5
11610/11610 - mean: 0.6566 - mean_absolute_error: 0.5373
Epoch 5/5
11610/11610 - mean: 0.6885 - mean_absolute_error: 0.5494


In [76]:
def cube(x):
    return x**3
cube(tf.constant(2))

<tf.Tensor: shape=(), dtype=int32, numpy=8>

In [77]:
# it analyse code that cube computes and converts it to tf graph
tf_cube = tf.function(cube)
tf_cube(2)

<tf.Tensor: shape=(), dtype=int32, numpy=8>

In [78]:
# or as decorator
@tf.function
def cube(x):
    return x**3
cube(2)

<tf.Tensor: shape=(), dtype=int32, numpy=8>

In [83]:
class LayerNormalization(keras.layers.Layer):
    def __init__(self, e=0.001, **kwargs):
        self.e = e
        super().__init__(**kwargs)
    def build(self, batch_input_shape):
        self.a = self.add_weight(
            name="a",
            shape=batch_input_shape[-1:],
            initializer="ones"
        )
        self.b = self.add_weight(
            name="b",
            shape=batch_input_shape[-1:],
            initializer="zeros"
        )
        super().build(batch_input_shape)
    def call(self, X):
        mean, svd = tf.nn.moments(X, axes=-1, keepdims=True)

        return self.a * (X - mean) / tf.sqrt(svd + self.e) + self.b

In [84]:
custom_norm = LayerNormalization()
keras_norm = keras.layers.LayerNormalization()

X = X_train.astype(np.float32)
tf.reduce_mean(keras.losses.mean_absolute_error(
    keras_norm(X), custom_norm(X)
))

<tf.Tensor: shape=(), dtype=float32, numpy=3.8659426e-08>