In [1]:
import tensorflow as tf
from tensorflow import keras

In [2]:
# define a matrix tensor
tf.constant([[1, 2, 3], [4, 5, 6]])

Metal device set to: Apple M1

systemMemory: 16.00 GB
maxCacheSize: 5.33 GB



2023-02-22 11:48:28.245544: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:305] Could not identify NUMA node of platform GPU ID 0, defaulting to 0. Your kernel may not have been built with NUMA support.
2023-02-22 11:48:28.246105: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:271] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 0 MB memory) -> physical PluggableDevice (device: 0, name: METAL, pci bus id: <undefined>)


<tf.Tensor: shape=(2, 3), dtype=int32, numpy=
array([[1, 2, 3],
       [4, 5, 6]], dtype=int32)>

In [3]:
# define a scalar tensor
tf.constant(42)

<tf.Tensor: shape=(), dtype=int32, numpy=42>

In [4]:
# define a matrix tensor and print some properties
t = tf.constant([[1, 2, 3], [4, 5, 6]])
print(t.shape)
print(t.dtype)

(2, 3)
<dtype: 'int32'>


In [5]:
# indexing
t[:, 1:]

<tf.Tensor: shape=(2, 2), dtype=int32, numpy=
array([[2, 3],
       [5, 6]], dtype=int32)>

In [6]:
t[..., 1, tf.newaxis]

<tf.Tensor: shape=(2, 1), dtype=int32, numpy=
array([[2],
       [5]], dtype=int32)>

In [7]:
# operations
t + 10  # tf.add(t, 10)

<tf.Tensor: shape=(2, 3), dtype=int32, numpy=
array([[11, 12, 13],
       [14, 15, 16]], dtype=int32)>

In [8]:
tf.square(t)

<tf.Tensor: shape=(2, 3), dtype=int32, numpy=
array([[ 1,  4,  9],
       [16, 25, 36]], dtype=int32)>

In [9]:
# matrix multiplication
# tensorflow doesnt have the .T attribute 
t @ tf.transpose(t)  # tf.matmul(t, tf.transpose(t))

<tf.Tensor: shape=(2, 2), dtype=int32, numpy=
array([[14, 32],
       [32, 77]], dtype=int32)>

In [12]:
# some functions are named different than in Numpy
print(tf.reduce_max(t))  # np.max
print(tf.reduce_mean(t))  # np.mean

tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(3, shape=(), dtype=int32)


In [13]:
# numpy operations to tensors and viceversa
import numpy as np
a = np.array([2, 4, 5])
print(tf.constant(a))
print(t.numpy())
print(tf.square(a))
print(np.square(t))

tf.Tensor([2 4 5], shape=(3,), dtype=int64)
[[1 2 3]
 [4 5 6]]
tf.Tensor([ 4 16 25], shape=(3,), dtype=int64)
[[ 1  4  9]
 [16 25 36]]


In [14]:
# because Numpy (64 bit) as TensorFlow (32 bit) use different precisions, remember to specify the precision when going from one to the other
tf.constant(a, dtype=tf.float32)

<tf.Tensor: shape=(3,), dtype=float32, numpy=array([2., 4., 5.], dtype=float32)>

In [15]:
# Tensors of different types cannot operate with one another
tf.constant(2.) + tf.constant(40)

InvalidArgumentError: cannot compute AddV2 as input #1(zero-based) was expected to be a float tensor but is a int32 tensor [Op:AddV2]

In [16]:
tf.constant(2.) + tf.constant(40., dtype=tf.float64)


InvalidArgumentError: cannot compute AddV2 as input #1(zero-based) was expected to be a float tensor but is a double tensor [Op:AddV2]

In [17]:
# variables
v = tf.Variable([[1., 2., 3.], [4., 5., 6.,]])
v

<tf.Variable 'Variable:0' shape=(2, 3) dtype=float32, numpy=
array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)>

In [18]:
# mutate a variable
v.assign(2 * v)

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2.,  4.,  6.],
       [ 8., 10., 12.]], dtype=float32)>

In [22]:
v.scatter_nd_update(indices=[[0, 0], [1, 2]], updates=[100., 200.])

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[100.,   4.,   6.],
       [  8.,  10., 200.]], dtype=float32)>

# Custom Loss Functions

In [None]:
# define a custom loss function
def huber_fn(y_true, y_pred):
    error = y_true - y_pred
    is_small_error = tf.abs(error) < 1
    squared_loss = tf.square(error) / 2
    linear_loss = tf.abs(error) - 0.5
    return tf.where(is_small_error, squared_loss, linear_loss)

In [None]:
# use the custom loss function when compiling a model
model.compile(loss=huber_fn, optimizer="nadam")

In [None]:
# save and load a model that uses a custom function
model.save("my_custom_keras_model.h5")
model = keras.models.load_model("my_custom_keras_model.h5",
                                custom_objects={"huber_fn": huber_fn})


In [None]:
# more rigorous way of making a custom loss function
class HuberLoss(keras.losses.Loss):
    def __init__(self, threshold=1.0, **kwargs):
        self.threshold = threshold
        super().__init__(**kwargs)

    def call(self, y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < self.threshold
        squared_loss = tf.square(error) / 2
        linear_loss = self.threshold * tf.abs(error) - self.threshold**2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)

    def get_config(self):
        # this function determines what is written when the model is saved
        # including all the custom parameters (in this case `threshold`) guarantees that the object can be initialized as HuberLoss(**loaded_params)
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold} 

# compile model with custom loss class
model.compile(loss=HuberLoss(2.0), optimizer="nadam")

# save and load model
model.save("my_custom_keras_model.h5")
model = keras.models.load_model("my_custom_keras_model.h5",
                                custom_objects={"HuberLoss": HuberLoss})


# Custom Activation Function, Initializer, and Regularizer

In [None]:
def my_softplus(z):  # equivalent to tf.nn.softplus
    return tf.math.log(tf.exp(z) + 1.0)

def my_glorot_initializer(shape, dtype=tf.float32): # equivalent to keras.initializers.glorot_normal
    stddev = tf.sqrt(2. / (shape[0] + shape[1]))
    return tf.random.normal(shape, stddev=stddev, dtype=dtype)

def my_l1_regularizer(weights):  # equivalent to keras.regularizers.l1(0.01)
    return tf.reduce_sum(tf.abs(0.01 * weights))

def my_positive_weights(weights):  # equivalent to tf.nn.relu(weights)
    return tf.where(weights < 0., tf.zeros_like(weights), weights)

# layer with custom functions
layer = keras.layers.Dense(30, 
                           activation=my_softplus,
                           kernel_initializer=my_glorot_initializer,
                           kernel_regularizer=my_l1_regularizer,
                           kernel_constraint=my_positive_weights)

In [None]:
# make a custom regularizer from a parent class
class MyL1Regularizer(keras.regularizers.Regularizer):
    def __init__(self, factor):
        # super().__init__(**kwargs) # this line should be added if the class also takes **kwargs
        self.factor = factor
    
    def __call__(self, weights):
        return tf.reduce_sum(tf.abs(self.factor * weights))
    
    def get_config(self):
        return {'factor': self.factor}

# Streaming Metrics

In [24]:
from tensorflow import keras

In [25]:
precision = keras.metrics.Precision()
precision([0, 1, 1, 1, 0, 1, 0, 1], [1, 1, 0, 1, 0, 1, 0, 1])


<tf.Tensor: shape=(), dtype=float32, numpy=0.8>

In [26]:
precision([0, 1, 0, 0, 1, 0, 1, 1], [1, 0, 1, 1, 0, 0, 0, 0])

<tf.Tensor: shape=(), dtype=float32, numpy=0.5>

In [27]:
# access the latest result
precision.result()

<tf.Tensor: shape=(), dtype=float32, numpy=0.5>

In [28]:
# access information about the calls
precision.variables

[<tf.Variable 'true_positives:0' shape=(1,) dtype=float32, numpy=array([4.], dtype=float32)>,
 <tf.Variable 'false_positives:0' shape=(1,) dtype=float32, numpy=array([4.], dtype=float32)>]

In [29]:
# reset the metric
precision.reset_states()

In [32]:
# check the reset
print(precision.result())
print(precision.variables)

tf.Tensor(0.0, shape=(), dtype=float32)
[<tf.Variable 'true_positives:0' shape=(1,) dtype=float32, numpy=array([0.], dtype=float32)>, <tf.Variable 'false_positives:0' shape=(1,) dtype=float32, numpy=array([0.], dtype=float32)>]


In [34]:
# make custom streaming metric with hyperparameters with parent class
class HuberMetric(keras.metrics.Metric):
    def __init__(self, threshold=1.0, **kwargs):
        super().__init__(**kwargs)
        self.threshold = threshold
        self.huber_fn = create_huber(threshold)  # return a Huber loss function with the right inputs (i.e., y_true, y_pred)
        self.total = self.add_weight("total", initializer="zeros")  # creates a dynamic variable needed to compute the custom Huber loss
        self.count = self.add_weight("count", initializer="zeros")

    def update_state(self, y_true, y_pred, sample_weight=None):  # updates the variables after each call
        metric = self.huber_fn(y_true, y_pred)
        self.total.assign_add(tf.reduce_sum(metric))
        self.count.assign_add(tf.cast(tf.size(y_true), tf.float32))

    def result(self):  # final result
        return self.total / self.count
    
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}
    
    # one could also override the .reset_states() method if necessary. 
    # By default, all variables/weights are reset to 0



# Custom Layers

In [3]:
# make a layer that computes the exponential of the inputs
exponential_layer = keras.layers.Lambda(lambda x: tf.exp(x))

In [None]:
# make custom, simplified version of a dense layer
class MyDense(keras.layers.Layer):
    def __init__(self, units, activation=None, **kwargs):
        super().__init__(**kwargs)
        self.units = units
        self.activation = keras.activations.get(activation)

    def build(self, batch_input_shape):
        # creates the layer's variables by calling add_weight
        self.kernel = self.add_weight(
            name="kernel",
            shape=[batch_input_shape[-1], self.units],
            initializer="glorot_normal"
        )
        self.bias = self.add_weight(
            name="bias",
            shape=[self.units],
            initializer="zeros"
        )
        super().build(batch_input_shape)  # must be at the end. It just sets self.built = True

    def call(self, X):
        # performs the desired operations.
        # for multiple outputs, the function must return a list with the outputs
        # for multiple inputs, the function must take a tuple with the inputs
        # if the layer should behave differently during training and testing, the function should take the `training` boolean argument and use it to decide what to do
        return self.activation(X @ self.kernel + self.bias)
    
    def compute_output_shape(self, batch_input_shape):
        # for multiple outputs, the function must return a list with the output shapes
        # for multiple inputs, the function must take a list of batch_input_shapes
        return tf.TensorShape(batch_input_shape.as_list()[:-1] + [self.units])
    
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, 
                "units": self.units, 
                "activation": keras.activations.serialize(self.activation)}

In [None]:
# make a toy layer with multiple inputs and outputs
class MyMultiLayer(keras.layers.Layer):
    def call(self, (X1, X2)):
        return [X1 + X2, X1 * X2, X1 / X2]

    def compute_output_shape(self, (batch_input_shape_1, batch_input_shape_2)):
        return [batch_input_shape_1, batch_input_shape_1, batch_input_shape_1]  # or something that makes more sense (i.e., handles broadcasting rules)


In [None]:
# make a toy layer that adds gaussian noise only during training
class MyGaussianNoise(keras.layers.Layer):
    def __init__(self, stddev, **kwargs):
        super().__init__(**kwargs)
        self.stddev = stddev

    def call(self, X, training=None):
        if training:
            noise = tf.random.normal(tf.shape(X), stddev=self.stddev)
            return X + noise
        else:
            return X
        
    def compute_output_shape(self, batch_input_shape):
        return batch_input_shape

In [None]:
# Make a custom layer of layers
class ResidualBlock(keras.layers.Layer):
    def __init__(self, n_layers, n_neurons, **kwargs):
        super().__init__(**kwargs):
        self.hidden = [keras.layers.Dense(n_neurons,
                                          activation="relu",
                                          kernel_initializer="he_normal")
                       for _ in range(n_layers)]
    
    def call(self, inputs):
        Z = inputs
        for layer in self.hidden:
            Z = layer(Z)
        return inputs + Z

In [None]:
# make a custom model
class ResidualRegressor(keras.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.hidden1 = keras.layers.Dense(30, activation="elu", kernel_initializer="he_normal")
        self.block1 = ResidualBlock(2, 30)
        self.block2 = ResidualBlock(2, 30)
        self.out = keras.layers.Dense(output_dim)

    def call(self, inputs):
        Z = self.hidden1(inputs)
        for _ in range(1 + 3):
            Z = self.block1(Z)  # a residual model is a model that passes the output of a layer to a previous layer
        Z = self.block2(Z)
        return self.out(Z)


In [None]:
# make a model that includes the loss of an auxiliary output to the total loss
class ReconstructingRegressor(keras.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.hidden = [keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal") for _ in range(5)]
        self.out = keras.layers.Dense(output_dim)

    def build(self, batch_input_shape):
        n_inputs = batch_input_shape[-1]
        self.reconstruct = keras.layers.Dense(n_inputs)  # layer used to reconstruct the input from the weights passed to the output layer
        super().build(batch_input_shape)

    def call(self, inputs):
        Z = inputs
        for layer in self.hidden:
            Z = layer(Z)
        reconstruction = self.reconstruct(Z)  # attempt at reconstructing the input
        recon_loss= tf.reduce_mean(tf.square(reconstruction - inputs))  # reconstruction loss
        self.add_loss(0.05 * recon_loss)  # total loss is the main loss plus 0.05 times the reconstruction loss
        return self.out(Z)


# Computation Graphs

In [4]:
# make function
def cube(x):
    return x**3

cube(2)

8

In [5]:
# convert the function into a tensorflow function
tf_cube = tf.function(cube)
tf_cube(2)

Metal device set to: Apple M1

systemMemory: 16.00 GB
maxCacheSize: 5.33 GB



2023-02-23 13:15:34.650178: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:305] Could not identify NUMA node of platform GPU ID 0, defaulting to 0. Your kernel may not have been built with NUMA support.
2023-02-23 13:15:34.650665: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:271] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 0 MB memory) -> physical PluggableDevice (device: 0, name: METAL, pci bus id: <undefined>)
2023-02-23 13:15:34.670282: W tensorflow/core/platform/profile_utils/cpu_utils.cc:128] Failed to get CPU frequency: 0 Hz


<tf.Tensor: shape=(), dtype=int32, numpy=8>

In [6]:
# alternatively
@tf.function
def tf_cube(x):
    return x**3

tf_cube(2)

<tf.Tensor: shape=(), dtype=int32, numpy=8>

In [8]:
# access the original Python function
tf_cube.python_function(2)

8