In [3]:
import tensorflow as tf
from sympy.physics.units.systems.mksa import units
from tensorflow import keras
from tensorflow.keras import layers
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import fetch_california_housing

# Load and scale data
X, y = fetch_california_housing(return_X_y=True)
X = StandardScaler().fit_transform(X)

# Split into train, validation, and test sets
X_main, X_test, y_main, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_main, y_main, test_size=0.2, random_state=42)

# Custom Huber loss function
def huber_fn(y_true, y_pred):
    error = y_true - y_pred
    is_small_error = tf.abs(error) < 1.0
    squared_loss = tf.square(error) / 2
    linear_loss = tf.abs(error) - 0.5
    return tf.where(is_small_error, squared_loss, linear_loss)

# Build the model
model = keras.models.Sequential([
    layers.Dense(30, activation="relu", input_shape=X_train.shape[1:]),
    layers.Dense(30, activation="relu"),
    layers.Dense(1)
])

# Compile the model
model.compile(loss=huber_fn, optimizer="nadam")

# Train the model
history = model.fit(X_train, y_train, epochs=20, validation_data=(X_val, y_val))


2025-06-14 12:43:40.169921: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


Epoch 1/20


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


[1m413/413[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2ms/step - loss: 0.5312 - val_loss: 0.2153
Epoch 2/20
[1m413/413[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1ms/step - loss: 0.1883 - val_loss: 0.1888
Epoch 3/20
[1m413/413[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1ms/step - loss: 0.1707 - val_loss: 0.1755
Epoch 4/20
[1m413/413[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1ms/step - loss: 0.1644 - val_loss: 0.1656
Epoch 5/20
[1m413/413[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1ms/step - loss: 0.1506 - val_loss: 0.1641
Epoch 6/20
[1m413/413[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1ms/step - loss: 0.1552 - val_loss: 0.1607
Epoch 7/20
[1m413/413[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1ms/step - loss: 0.1488 - val_loss: 0.1561
Epoch 8/20
[1m413/413[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1ms/step - loss: 0.1427 - val_loss: 0.1531
Epoch 9/20
[1m413/413[0m [32m━━━━━━━━━━━━━━━━━━━

In [4]:
def my_softplus(z):
    return tf.math.log(1.0 + tf.exp(z))
def my_initializer(shape, dtype = tf.float32):
    stddev = tf.sqrt(2. / (shape[0] + shape[1]))
    return tf.random.normal(shape, stddev = stddev, dtype = dtype)

def my_l1_regularizer(weights):
    return tf.reduce_sum(tf.abs(0.01 * weights))
def my_positive_weights(weights):
    return tf.where(weights < 0., tf.zeros_like(weights), weights)

layer = tf.keras.layers.Dense(1, activation = my_softplus,
                              kernel_initializer=my_initializer,
                              kernel_regularizer=my_l1_regularizer,
                              kernel_constraint=my_positive_weights)

In [5]:
class MyL1Regularizer(tf.keras.regularizers.Regularizer):
    def __init__(self, factor):
        self.factor = factor
    def __call__(self, weights):
        return tf.reduce_sum(tf.abs(self.factor * weights))
    def get_config(self):
        return{"factor": self.factor}

In [6]:
precision = tf.keras.metrics.Precision()
precision([0, 1, 1, 1, 0, 1, 0, 1], [1, 1, 0, 1, 0, 1, 0, 1])
precision([0, 1, 0, 0, 1, 0, 1, 1], [1, 0, 1, 1, 0, 0, 0, 0])



<tf.Tensor: shape=(), dtype=float32, numpy=0.5>

In [7]:
precision.result()

<tf.Tensor: shape=(), dtype=float32, numpy=0.5>

In [8]:
precision.variables

[<Variable path=precision/true_positives, shape=(1,), dtype=float32, value=[4.]>,
 <Variable path=precision/false_positives, shape=(1,), dtype=float32, value=[4.]>]

In [9]:
def create_huber(threshold=1.0):
    def huber_fn(y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < threshold
        squared_loss = tf.square(error) / 2
        linear_loss = threshold * tf.abs(error) - threshold ** 2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)
    return huber_fn

In [10]:
class HuberMetric(tf.keras.metrics.Metric):
    def __init__(self, threshold = 1.0, **kwargs):
        super().__init__(**kwargs)
        self.threshold = threshold
        self.huber_fn = create_huber(threshold)
        self.total = self.add_weight("total", initializer = 'zeros')
        self.count = self.add_weight('count', initializer = 'zeros')
    def update_state(self, y_true, y_pred, sample_weight = None):
        sample_metrics = self.huber_fn(y_true, y_pred)
        self.total.assign_add(tf.reduce_sum(sample_metrics))
        self.count.assign_add(tf.cast(tf.size(y_true), tf.float32))

    def result(self):
        return self.total / self.count

    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}
    

In [11]:
tf.keras.layers.Flatten

keras.src.layers.reshaping.flatten.Flatten

In [12]:
exponential_layer = tf.keras.layers.Lambda(lambda x: tf.exp(x))

In [13]:
class MyDense(tf.keras.layers.Layer):
    def __init__(self, units, activation = None, **kwargs):
        self.units = units
        self.activation = tf.keras.activations.get(activation)
    def build(self, batch_input_shape):
        self.kernel = self.add_weight(
            name = 'kernel', shape = [batch_input_shape[-1], self.units],
            initializer = "glorot_normal")
        self.bias = self.add_weight(
            name = "bias", shape = [self.units], initializer = 'zeros')

    def call(self, X):
        return self.activation(X @ self.kernel + self.bias)
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "units": self.units,
                "activation": tf.keras.activations.serialize(self.activation)}

In [14]:
class MyMultiLayer(tf.keras.layers.Layer):
    def call(self, X):
        X1, X2 = X
        return X1 + X2, X1 * X2, X1 / X2

In [17]:
class MyGaussianNoise(tf.keras.layers.Layer):
    def __init__(self, stddev, **kwargs):
        super().__init__(**kwargs)
        self.stddev = stddev

In [18]:
class MyGaussianNoise(tf.keras.layers.Layer):
    def __init__(self, stddev, **kwargs):
        super().__init__(**kwargs)
        super().init(**kwargs)
        self.stddev = stddev

    def call(self, X, training =  False):
        if training:
            noise = tf.random.normal(tf.shape(X), stddev = self.stddev)
            return X + noise
        else:
            return X

In [24]:
class ResidualBlock(tf.keras.layers.Layer):
    def __init__(self, n_layers, n_neurons, **kwargs):
        super().__init__(**kwargs)
        self.hidden1 = [tf.keras.layers.Dense(n_neurons, activation = 'relu',
                                             kernel_initializer='he_normal')]
    def call(self, inputs):
        Z = inputs
        for layer in self.hidden:
            Z = layer(Z)
        return inputs + Z

class ResidualRegressor(tf.keras.Model):
    def __init__(self, output_dim, **kwargs):

        super().__init__(**kwargs)
        self.hidden1 = tf.keras.layers.Dense(30, activation = "relu",
                                                 kernel_initializer='he_normal')
        self.block1 = ResidualBlock(2, 30)
        self.block2 = ResidualBlock(2, 30)
        self.out = tf.keras.layers.Dense(output_dim)
    def call(self, inputs):
        Z = self.hidden1(inputs)
        for _ in range(1 + 3):
            Z = self.block1(Z)
        Z = self.block2(Z)
        return self.out(Z)

In [25]:
class ReconstructingRegressor(tf.keras.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.hidden = [tf.keras.layers.Dense(30, activation = 'relu',
                       kernel_initializer = 'he_normal') for _ in range(5)]

    def build(self, batch_input_shape):
        n_inputs = batch_input_shape[-1]
        self.reconstruct = tf.keras.layers.Dense(n_inputs)


    def call(self, inputs, training = False):
        Z = inputs
        for layer in self.hidden:
            Z = layer(Z)
        reconstruction = self.reconstruct(Z)
        recon_loss = tf.reduce_mean(tf.square(reconstruction - inputs))

        self.add_loss(0.05 * recon_loss)
        if training:
            result = self.reconstruction_mean(recon_loss)
            self.add_metric(result)
        return self.out(Z)

In [28]:
def f(w1, w2):
    return 3 * w1 **2 + 2 * w1 * w2

In [30]:
w1, w2 = 5, 3
eps = 1e-6
(f(w1 + eps, w2) - f(w1, w2)) / eps

(f(w1, w2 + eps) - f(w1, w2)) / eps

10.000000003174137

In [38]:
w1, w2 = tf.Variable(5.), tf.Variable(3.)
with tf.GradientTape() as tape:
    z = f(w1, w2)
gradients = tape.gradient(z, [w1, w2])
gradients
with tf.GradientTape() as tape:
    z = f(w1, w2)
dz_dw1 = tape.gradient(z, w1)
dz_dw2 = tape.gradient(z, w2)
del tape

RuntimeError: A non-persistent GradientTape can only be used to compute one set of gradients (or jacobians)

In [36]:
with tf.GradientTape() as tape:
    z = f(w1, w2)
dz_dw1 = tape.gradient(z, w1)
dz_dw2 = tape.gradient(z, w2)
del tape

RuntimeError: A non-persistent GradientTape can only be used to compute one set of gradients (or jacobians)

In [41]:
with tf.GradientTape(persistent=True) as tape:
    z = f(w1, w2)
dz_dw1 = tape.gradient(z, w1)
dz_dw2 = tape.gradient(z, w2)
del tape  # now manual cleanup is required


In [43]:
c1, c2 = tf.constant(5.), tf.constant(3.)
with tf.GradientTape() as tape:
    tape.watch(c1)
    tape.watch(c2)
    z = f(c1, c2)

    gradients = tape.gradient(z, [c1, c2])

prod_: 25502500
sum_: 510050


In [43]:
def f(w1, w2):
    return 3 * w1 **2 + tf.stop_gradient(2 * w1 * w2)
with tf.GradientTape() as tape:
    z = f(w1, w2)
gradients = tape.gradient(z, w1, w2)

x = tf.Variable(1e-50)
with tf.GradientTape() as tape:
    z = tf.sqrt(x)
tape.gradient(z, [x])


[<tf.Tensor: shape=(), dtype=float32, numpy=inf>]

In [45]:
def my_softplus(z):
    return tf.math.log(1 + tf.exp(-tf.abs(z))) + tf.maximum(0., z)

@tf.custom_gradient
def my_softplus(z):
    def my_softplus_gradients(grads):
        return grads * (1 - 1 / (1 + tf.exp(z))) + tf.maximum(0., z )
    result = tf.math.log(1 + tf.exp(-tf.abs(z))) + tf.maximum(0., z)
    return result, my_softplus_gradients

In [48]:
l2_reg = tf.keras.regularizers.l2(0.05)
model = tf.keras.models.Sequential([
    tf.keras.layers.Dense(30, activation = 'relu',
                          kernel_initializer = 'he_normal',
                          kernel_regularizer = l2_reg),
    tf.keras.layers.Dense(1, kernel_regularizer = l2_reg)
])


In [58]:
import numpy as np
def random_batch(X, y, batch_size=32):
    idx = np.random.randint(len(X), size=batch_size)
    return X[idx], y[idx]


In [59]:
def print_status_bar(step, total, loss, metrics = None):
    metrics = "-".join([f"{m.name}: {m.result():.4f}" for m in [loss]+ (metrics or [])] )

    end = "" if step < total else "\n"
    print(f"\r{step}/{total} -" + metrics, end = end)

In [60]:
n_epochs = 5
batch_size = 32
n_steps = len(X_train) // batch_size
optimizer = tf.keras.optimizers.SGD(learning_rate = 0.01)
loss_fn = tf.keras.losses.MeanSquaredError()
mean_loss = tf.keras.metrics.Mean(name = "mean_loss")

metrics = [tf.keras.metrics.MeanAbsoluteError()]

In [66]:
for epoch in range(1, n_epochs + 1):
    print(f"Epoch {epoch} / {n_epochs}")

    for step in range(1, n_steps + 1):
        X_batch, y_batch = random_batch(X_train, y_train)

        with tf.GradientTape() as tape:
            y_pred = model(X_batch, training=True)
            main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
            total_loss = tf.add_n([main_loss] + model.losses)

        gradients = tape.gradient(total_loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))

        mean_loss.update_state(total_loss)
        for metric in metrics:
            metric.update_state(y_batch, y_pred)

        print_status_bar(step, n_steps, mean_loss, metrics)

    # Reset metrics at the end of each epoch
    for metric in [mean_loss] + metrics:
        metric.reset_state()


Epoch 1 / 5
412/412 -mean_loss: 0.6340-mean_absolute_error: 0.5106
Epoch 2 / 5
412/412 -mean_loss: 0.6159-mean_absolute_error: 0.5077
Epoch 3 / 5
412/412 -mean_loss: 0.6403-mean_absolute_error: 0.5192
Epoch 4 / 5
412/412 -mean_loss: 0.6362-mean_absolute_error: 0.5176
Epoch 5 / 5
412/412 -mean_loss: 0.6365-mean_absolute_error: 0.5177


In [67]:
for variable in model.variables:
    if variable.constraint is not None:
        variable.assign(variable.constraint(variable))

In [68]:
def cube(x):
    return x**3
cube(tf.constant(2.0))

<tf.Tensor: shape=(), dtype=float32, numpy=8.0>

In [72]:
tf_cube = tf.function(cube)
tf_cube(3.)

<tf.Tensor: shape=(), dtype=float32, numpy=27.0>

In [79]:

@tf.function
def tf_cube(x):
    return x**3

tf_cube(3)

tf.cube.python_function(2)

8

In [80]:
@tf.function
def loop_demo(x):
    result = 0
    for i in tf.range(x):
        result += i
    return result

print(tf.autograph.to_code(loop_demo.python_function))


def tf__loop_demo(x):
    with ag__.FunctionScope('loop_demo', 'fscope', ag__.ConversionOptions(recursive=True, user_requested=True, optional_features=(), internal_convert_user_code=True)) as fscope:
        do_return = False
        retval_ = ag__.UndefinedReturnValue()
        result = 0

        def get_state():
            return (result,)

        def set_state(vars_):
            nonlocal result
            (result,) = vars_

        def loop_body(itr):
            nonlocal result
            i = itr
            result = ag__.ld(result)
            result += i
        i = ag__.Undefined('i')
        ag__.for_stmt(ag__.converted_call(ag__.ld(tf).range, (ag__.ld(x),), None, fscope), None, loop_body, get_state, set_state, ('result',), {'iterate_names': 'i'})
        try:
            do_return = True
            retval_ = ag__.ld(result)
        except:
            do_return = False
            raise
        return fscope.ret(retval_, do_return)



In [81]:
tf.range(10)

<tf.Tensor: shape=(10,), dtype=int32, numpy=array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], dtype=int32)>

In [83]:
tf.constant(np.arange(10))

<tf.Tensor: shape=(10,), dtype=int64, numpy=array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])>

In [106]:
x=tf.constant([[1, 2], [3, 4]])
w=tf.Variable(tf.random.normal([2, 2]))
sparce = tf.sparse.SparseTensor(indices = [[0, 0]], values = [1], dense_shape =[3, 3])
ragged = tf.ragged.constant([[1, 2], [3], [4, 5, 6]])
ta = tf.TensorArray(dtype = tf.float32, size = 3)
ds = tf.data.Dataset.from_tensor_slices(tf.constant([1, 2, 3]))

<_TensorSliceDataset element_spec=TensorSpec(shape=(), dtype=tf.int32, name=None)>

In [11]:
import tensorflow as tf

class CustomLayerNormalization(tf.keras.layers.Layer):
    def __init__(self, epsilon=1e-3, **kwargs):
        super().__init__(**kwargs)
        self.epsilon = epsilon

    def build(self, input_shape):
        # α (scale) and β (shift): shape = last feature axis
        self.alpha = self.add_weight(
            name='alpha',
            shape=(input_shape[-1],),
            dtype=tf.float32,
            initializer='ones',
            trainable=True
        )
        self.beta = self.add_weight(
            name='beta',
            shape=(input_shape[-1],),
            dtype=tf.float32,
            initializer='zeros',
            trainable=True
        )

    def call(self, inputs):
        # Compute mean μ and variance σ² over the last axis (per instance)
        mean, variance = tf.nn.moments(inputs, axes=-1, keepdims=True)
        std = tf.sqrt(variance + self.epsilon)
        normalized = (inputs - mean) / std
        return self.alpha * normalized + self.beta


In [14]:
# Dummy input: batch of 2 samples, 5 features
X = tf.random.normal(shape=(2, 5))

# Built-in LayerNormalization
keras_norm = tf.keras.layers.LayerNormalization(epsilon=1e-3)
keras_output = keras_norm(X)

# Custom LayerNormalization
custom_norm = CustomLayerNormalization(epsilon=1e-3)
custom_output = custom_norm(X)

# Check difference
print("Max absolute difference:", tf.reduce_max(tf.abs(keras_output - custom_output)).numpy())


Max absolute difference: 1.1920929e-07


In [16]:
import tensorflow as tf

# Load the dataset
(X_train, y_train), (X_test, y_test) = tf.keras.datasets.mnist.load_data()

# Normalize pixel values to [0, 1]
X_train = X_train.astype("float32") / 255.0
X_test = X_test.astype("float32") / 255.0

# If using with Dense layers, flatten the images:
X_train_flat = X_train.reshape(-1, 28 * 28)
X_test_flat = X_test.reshape(-1, 28 * 28)




array([[0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       ...,
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.]], dtype=float32)