In [15]:
#Creating our own tensors (multidimensional arrays)
import tensorflow as tf
import keras
import numpy as np

tensor_matrix = tf.constant([[1., 2., 3.], [4., 5., 6.]])
tensor_scalar = tf.constant(66)
print("Tensor Matrix: \n"+str(tensor_matrix))
print("Tensor Scalar: \n"+str(tensor_scalar))

Tensor Matrix: 
tf.Tensor(
[[1. 2. 3.]
 [4. 5. 6.]], shape=(2, 3), dtype=float32)
Tensor Scalar: 
tf.Tensor(66, shape=(), dtype=int32)


In [16]:
#Mutable Variables
v = tf.Variable([[1., 2., 3.], [4., 5., 6.]])
print(v)

v.assign(2 * v) # => [[2., 4., 6.], [8., 10., 12.]]
v[0, 1].assign(42) # => [[2., 42., 6.], [8., 10., 12.]]
v[:, 2].assign([0., 1.]) # => [[2., 42., 0.], [8., 10., 1.]]
v.scatter_nd_update(indices=[[0, 0], [1, 2]], updates=[100., 200.])
print(v)

<tf.Variable 'Variable:0' shape=(2, 3) dtype=float32, numpy=
array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)>
<tf.Variable 'Variable:0' shape=(2, 3) dtype=float32, numpy=
array([[100.,  42.,   0.],
       [  8.,  10., 200.]], dtype=float32)>


In [17]:
#Example custom loss function with ability for import and export
# def create_huber(threshold=1.0)
#     def huber_fn(y_true, y_pred):
#         error = y_true - y_pred
#         is_small_error = tf.abs(error) < threshold
#         squared_loss = tf.square(error) / 2
#         linear_loss = threshold * tf.abs(error) - threshold**2 / 2
#         return tf.where(is_small_error, squared_loss, linear_loss)
#     return huber_fn

#When loss function created it keeps threshold parameter mapped with a dictionary, which then returns that dictionary mapping with the get_config function
class HuberLoss(keras.losses.Loss):
    def __init__(self, threshold=1.0, **kwargs):
        self.threshold = threshold
        super().__init__(**kwargs)
    def call(self, y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < self.threshold
        squared_loss = tf.square(error) / 2
        linear_loss = self.threshold * tf.abs(error) - self.threshold**2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}

#...
# model.compile(loss=HuberLoss(2.), optimizer="nadam")
# model.fit(X_train, y_train, [...])
# model = keras.models.load_model("my_model_with_a_custom_loss_class.h5",custom_objects={"HuberLoss": HuberLoss})


In [18]:
#Custom Activations/Initializers/Regularizers/Contraints
def my_softplus(z): # return value is just tf.nn.softplus(z)
    return tf.math.log(tf.exp(z) + 1.0)

def my_glorot_initializer(shape, dtype=tf.float32):
    stddev = tf.sqrt(2. / (shape[0] + shape[1]))
    return tf.random.normal(shape, stddev=stddev, dtype=dtype)

def my_l1_regularizer(weights):
    return tf.reduce_sum(tf.abs(0.01 * weights))

class MyL1Regularizer(keras.regularizers.Regularizer):
    def __init__(self, factor):
        self.factor = factor
    def __call__(self, weights):
        return tf.reduce_sum(tf.abs(self.factor * weights))
    def get_config(self):
        return {"factor": self.factor}

def my_positive_weights(weights): # return value is just tf.nn.relu(weights)
    return tf.where(weights < 0., tf.zeros_like(weights), weights)

#Then use them as normal
# layer = keras.layers.Dense(30, activation=my_softplus,
#     kernel_initializer=my_glorot_initializer,
#     kernel_regularizer=my_l1_regularizer,
#     kernel_constraint=my_positive_weights)


In [19]:
#Streaming Metric using Huber!
class HuberMetric(keras.metrics.Metric):
    def __init__(self, threshold=1.0, **kwargs):
        super().__init__(**kwargs) # handles base args (e.g., dtype)
        self.threshold = threshold
        self.huber_fn = create_huber(threshold)
        #Keep track of state across batches
        self.total = self.add_weight("total", initializer="zeros")
        self.count = self.add_weight("count", initializer="zeros")
    #Updates variables
    def update_state(self, y_true, y_pred, sample_weight=None):
        metric = self.huber_fn(y_true, y_pred)
        self.total.assign_add(tf.reduce_sum(metric))
        self.count.assign_add(tf.cast(tf.size(y_true), tf.float32))
    def result(self):
        return self.total / self.count
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}


In [20]:
#custom layers

#No weight layer wrapped in Lambda
exponential_layer = keras.layers.Lambda(lambda x: tf.exp(x))

#A different one
class MyDense(keras.layers.Layer):
    def __init__(self, units, activation=None, **kwargs):
        super().__init__(**kwargs)
        self.units = units
        self.activation = keras.activations.get(activation)

    #Create layer variables like weights
    def build(self, batch_input_shape):
        self.kernel = self.add_weight(
            name="kernel", shape=[batch_input_shape[-1], self.units],
            initializer="glorot_normal")
        self.bias = self.add_weight(
            name="bias", shape=[self.units], initializer="zeros")
        super().build(batch_input_shape) # must be at the end

    #NOTE: @ is for matrix multiplication
    def call(self, X):
        return self.activation(X @ self.kernel + self.bias)

    #Usually not necessary unless layer size is dynamic
    def compute_output_shape(self, batch_input_shape):
        return tf.TensorShape(batch_input_shape.as_list()[:-1] + [self.units])
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "units": self.units,
            "activation": keras.activations.serialize(self.activation)}

#A Multiple Input Layer!
class MyMultiLayer(keras.layers.Layer):
    #Input is a tuple
    def call(self, X):
        X1, X2 = X
        return [X1 + X2, X1 * X2, X1 / X2]
    def compute_output_shape(self, batch_input_shape):
        b1, b2 = batch_input_shape
        return [b1, b1, b1] # should probably handle broadcasting rules

#Different rules for training and testing
class MyGaussianNoise(keras.layers.Layer):
    def __init__(self, stddev, **kwargs):
        super().__init__(**kwargs)
        self.stddev = stddev
    def call(self, X, training=None):
        if training:
            noise = tf.random.normal(tf.shape(X), stddev=self.stddev)
            return X + noise
        else:
            return X
    def compute_output_shape(self, batch_input_shape):
        return batch_input_shape


In [21]:
#Creating a Particular Custom Model

#The Residual Block
class ResidualBlock(keras.layers.Layer):
    def __init__(self, n_layers, n_neurons, **kwargs):
        super().__init__(**kwargs)
        self.n_layers = n_layers
        self.n_neurons = n_neurons
        self.hidden = [keras.layers.Dense(n_neurons, activation="elu",
            kernel_initializer="he_normal") for _ in range(n_layers)]
    def call(self, inputs):
        Z = inputs
        for layer in self.hidden:
            Z = layer(Z)
        return inputs + Z
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "n_layers": self.n_layers, "n_neurons": self.n_neurons}

#SubClass for our Model
class ResidualRegressor(keras.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.output_dim = output_dim
        self.hidden1 = keras.layers.Dense(30, activation="elu",
            kernel_initializer="he_normal")
        self.block1 = ResidualBlock(2, 30)
        self.block2 = ResidualBlock(2, 30)
        self.out = keras.layers.Dense(output_dim)
    def call(self, inputs):
        Z = self.hidden1(inputs)
        for _ in range(1 + 3):
            Z = self.block1(Z)
        Z = self.block2(Z)
        return self.out(Z)
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "output_dim": self.output_dim}


In [22]:
#Using Model info for Metrics (Reconstruction Loss)
class ReconstructingRegressor(keras.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.hidden = [keras.layers.Dense(30, activation="selu",
                kernel_initializer="lecun_normal") for _ in range(5)]
        self.out = keras.layers.Dense(output_dim)
    #Extra dense layer created used for reconstruction
    def build(self, batch_input_shape):
        n_inputs = batch_input_shape[-1]
        self.reconstruct = keras.layers.Dense(n_inputs)
        super().build(batch_input_shape)
    def call(self, inputs):
        Z = inputs
        for layer in self.hidden:
            Z = layer(Z)
        reconstruction = self.reconstruct(Z)
        recon_loss = tf.reduce_mean(tf.square(reconstruction - inputs))
        self.add_loss(0.05 * recon_loss)
        return self.out(Z)


In [24]:
#Simple model with custom training loop
l2_reg = keras.regularizers.l2(0.05)
model = keras.models.Sequential([
        keras.layers.Dense(30, activation="elu", kernel_initializer="he_normal",
        kernel_regularizer=l2_reg),
        keras.layers.Dense(1, kernel_regularizer=l2_reg)
    ])

#Tiny randomly sampled batches
def random_batch(X, y, batch_size=32):
    idx = np.random.randint(len(X), size=batch_size)
    return X[idx], y[idx]

#Display training status
def print_status_bar(iteration, total, loss, metrics=None):
    metrics = " - ".join(["{}: {:.4f}".format(m.name, m.result())
        for m in [loss] + (metrics or [])])
    end = "" if iteration < total else "\n"
    print("\r{}/{} - ".format(iteration, total) + metrics, end=end)

#Data
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(
    housing.data, housing.target)
X_train, X_valid, y_train, y_valid = train_test_split(
    X_train_full, y_train_full)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_valid_scaled = scaler.transform(X_valid)
X_test_scaled = scaler.transform(X_test)

#Hyperparams
n_epochs = 5
batch_size = 32
n_steps = len(X_train) // batch_size
optimizer = keras.optimizers.Nadam(lr=0.01)
loss_fn = keras.losses.mean_squared_error
mean_loss = keras.metrics.Mean()
metrics = [keras.metrics.MeanAbsoluteError()]

#The L00pz
for epoch in range(1, n_epochs + 1):
    print("Epoch {}/{}".format(epoch, n_epochs))
    for step in range(1, n_steps + 1):
        X_batch, y_batch = random_batch(X_train_scaled, y_train)
        with tf.GradientTape() as tape:
            y_pred = model(X_batch, training=True)
            main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
            loss = tf.add_n([main_loss] + model.losses)
        gradients = tape.gradient(loss, model.trainable_variables)

        #Here is where you would apply other transformations

        optimizer.apply_gradients(zip(gradients, model.trainable_variables))

        #Here is where you would add weight constraings
        for variable in model.variables:
            if variable.constraint is not None:
                variable.assign(variable.constraint(variable))


        mean_loss(loss)
        for metric in metrics:
            metric(y_batch, y_pred)
        print_status_bar(step * batch_size, len(y_train), mean_loss, metrics)
    print_status_bar(len(y_train), len(y_train), mean_loss, metrics)
    for metric in [mean_loss] + metrics:
        metric.reset_states()

Epoch 1/5
11610/11610 - mean: 2.1089 - mean_absolute_error: 0.9227
Epoch 2/5
11610/11610 - mean: 1.4084 - mean_absolute_error: 0.9118
Epoch 3/5
11610/11610 - mean: 1.3771 - mean_absolute_error: 0.9075
Epoch 4/5
11610/11610 - mean: 1.3659 - mean_absolute_error: 0.9082
Epoch 5/5
11610/11610 - mean: 1.4011 - mean_absolute_error: 0.9218


In [28]:
#Taking a look at TensorFlow graphs

#Arbitraty funciton
def cube(x): 
    return x**3

tf_cube = tf.function(cube)
tf_cube(tf.constant(2.0))

#OR
@tf.function
def tf_cube(x):
    return x**3

#Does the same thing but is optimized for tensors under the hood

#Using the original funciton
tf_cube.python_function(3)
tf.autograph.to_code(tf_cube.python_function)

"def tf__tf_cube(x):\n    with ag__.FunctionScope('tf_cube', 'fscope', ag__.ConversionOptions(recursive=True, user_requested=True, optional_features=(), internal_convert_user_code=True)) as fscope:\n        do_return = False\n        retval_ = ag__.UndefinedReturnValue()\n        try:\n            do_return = True\n            retval_ = (ag__.ld(x) ** 3)\n        except:\n            do_return = False\n            raise\n        return fscope.ret(retval_, do_return)\n"