In [None]:
import tensorflow as tf
from tensorflow import keras

import numpy as np
import pandas as pd

EPOCHS = 3
BATCH_SIZE = 32

tf.keras.utils.set_random_seed(42)

In [None]:
# fake linear dataset

N_SAMPLES = 10000
N_FEATURES = 10

x = np.random.normal(size=(N_SAMPLES, N_FEATURES))
BIAS = np.random.normal()
BETAS = np.random.normal(size=(1, N_FEATURES))
y = BIAS + (x * BETAS).sum(1, keepdims=True) # keepdims solves our bug from before

In [None]:
class LinearRegression(keras.Model):

  def build(self, input_shapes):
    self.layer = keras.layers.Dense(1, kernel_initializer="zeros")

  def call(self, input_data, training=None):
    return self.layer(input_data)

linear_model = LinearRegression()
linear_model.compile(optimizer=keras.optimizers.SGD(), loss=keras.losses.MSE)
linear_model.fit(x, y, epochs=EPOCHS, batch_size=BATCH_SIZE)

Epoch 1/3
Epoch 2/3
Epoch 3/3


<keras.callbacks.History at 0x7ec774b5b880>

In [None]:
class LinearRegression(keras.Model):

  def build(self, input_shapes):
    self.layer = keras.layers.Dense(1, kernel_initializer="zeros")

  def call(self, input_data, training=None):
    return self.layer(input_data, training=training)

  def train_step(self, data):
    x, y = data

    with tf.GradientTape() as tape:
      prediction = self(x, training=True)
      loss = self.compiled_loss(prediction, y) # self.compiled_loss is keras internal API for accessing the loss function

    trainable_vars = self.trainable_variables
    gradients = tape.gradient(loss, trainable_vars)

    self.optimizer.apply_gradients(zip(gradients, trainable_vars))
    return {"loss": loss}

linear_model = LinearRegression()
linear_model.compile(optimizer=keras.optimizers.SGD(), loss=keras.losses.MSE)
linear_model.fit(x, y, epochs=EPOCHS, batch_size=BATCH_SIZE)

Epoch 1/3
Epoch 2/3
Epoch 3/3


<keras.callbacks.History at 0x7ec7741bdcc0>

In [None]:
model_beta, model_bias = linear_model.layer.weights
model_beta - BETAS.T, model_bias - BIAS

(<tf.Tensor: shape=(10, 1), dtype=float32, numpy=
 array([[ 8.3446503e-07],
        [-2.9802322e-07],
        [ 1.1920929e-07],
        [ 0.0000000e+00],
        [-7.4505806e-08],
        [ 5.2154064e-08],
        [ 1.0728836e-06],
        [-1.1920929e-06],
        [-1.3113022e-06],
        [-5.9604645e-08]], dtype=float32)>,
 <tf.Tensor: shape=(1,), dtype=float32, numpy=array([-1.66893e-06], dtype=float32)>)

In [None]:
from tqdm.auto import tqdm

# Now, we implement everything without keras at all!

def mean_squared_error(y_true, y_pred):
  return tf.reduce_mean(tf.math.square(y_true - y_pred))

class Dense(tf.Module):

  def __init__(self, in_dim, out_dim):
    super().__init__()
    self.bias = tf.Variable(0.0, dtype=tf.float32)
    self.weights = tf.Variable(np.zeros((in_dim, out_dim)), dtype=tf.float32)

  def __call__(self, input_data, training=None):
    return self.bias + input_data @ self.weights

class LinearRegressionRaw(tf.Module):
  def __init__(self):
    self.layer = Dense(N_FEATURES, 1)

  def __call__(self, input_data, training=None):
    return self.layer(input_data, training=training)

  def compile(self, optimizer, loss):
    self.optimizer = optimizer
    self.loss = loss

  def step(self, x, y):
    # use tensorflow API to monitor gradients of operations
    with tf.GradientTape() as tape:
      # get prediction
      prediction = self(x)
      # calculate loss
      loss = self.loss(y, prediction)
    # calculate gradients
    grads = tape.gradient(loss, self.trainable_variables)
    # apply backprop
    self.optimizer.apply_gradients(zip(grads, self.trainable_variables))
    return loss

  def fit(self, x, y, epochs, batch_size):
    n_batches = (len(x) // batch_size) + 1
    data_idxs = np.arange(len(x))
    for epoch in range(epochs):
      # loss container to print performance and progress bar creation
      losses = []
      progress = tqdm(total=n_batches)
      # shuffle the data between each epoch
      np.random.shuffle(data_idxs)
      shuffle_x = x[data_idxs]
      shuffle_y = y[data_idxs]
      # call model over each batch
      for batch in range(n_batches):
        # index into the data and convert to a tensor
        x_batch = tf.convert_to_tensor(shuffle_x[batch * batch_size: (batch + 1) * batch_size], dtype=tf.float32)
        y_batch = tf.convert_to_tensor(shuffle_y[batch * batch_size: (batch + 1) * batch_size], dtype=tf.float32)
        # call the model
        loss = self.step(x_batch, y_batch)
        # update the progress bar
        losses.append(np.average(loss))
        progress.set_postfix(loss=np.average(losses))
        progress.update(1)
      progress.close()


linear_model_raw = LinearRegressionRaw()
linear_model_raw.compile(
    optimizer = keras.optimizers.SGD(), # okay, we used keras optimizer, but next time we can implement gradient descent from scratch if you want!
    loss = mean_squared_error,
)
linear_model_raw.fit(x, y, epochs=EPOCHS, batch_size=BATCH_SIZE)

  0%|          | 0/313 [00:00<?, ?it/s]

  0%|          | 0/313 [00:00<?, ?it/s]

  0%|          | 0/313 [00:00<?, ?it/s]

In [None]:
model_bias, model_beta = linear_model_raw.trainable_variables
model_beta - BETAS.T, model_bias - BIAS

(<tf.Tensor: shape=(10, 1), dtype=float32, numpy=
 array([[ 9.5367432e-07],
        [-4.1723251e-07],
        [ 2.9802322e-07],
        [ 5.9604645e-08],
        [-2.9802322e-08],
        [ 7.4505806e-09],
        [ 1.1920929e-06],
        [-9.5367432e-07],
        [-1.3113022e-06],
        [-5.9604645e-08]], dtype=float32)>,
 <tf.Tensor: shape=(), dtype=float32, numpy=-1.66893e-06>)