# Session 15

## Gradient Descent

It is an optimisation technique used to iteratively find the most appropriate
weights for the model.

First you define your architecture, and choose a loss metric (error/loss
function), then you do feed forward.
Then you find the error/loss, and feed it backward into your network, through
the help of calculus (gradient), and chain rule.

To do so, we define the metrics that we might use:

---

### Includes


In [1]:
# just for clean code, best practices
from __future__ import annotations
# we will use numpy for implementation
import numpy as np

# again, just for clean code
from typing import Callable

# set a seed for better debugging
np.random.seed(42)


### Defining loss metrics

The error functions that might be used to tell how well a model performs


In [2]:
# cost functions
# more here: https://builtin.com/machine-learning/common-loss-functions


def mean_absolute_error(labels: np.ndarray, predictions: np.ndarray) -> float:
    """returns the sum of squared errors

    Parameters
    ----------

    Returns
    -------
    out: float

    """
    m = len(labels)
    return np.sum(np.abs(labels - predictions)) / m


def mean_squared_error(labels: np.ndarray, predictions: np.ndarray) -> float:
    """returns the mean squared error

    Parameters
    ----------

    Returns
    -------
    out: float

    """
    m = len(labels)
    return np.sum((labels - predictions) ** 2) / (2 * m)


def root_mean_squared_error(labels: np.ndarray, predictions: np.ndarray) -> float:
    """returns the root mean squared error

    Parameters
    ----------

    Returns
    -------
    out: float
    """
    return np.sqrt(mean_squared_error(labels, predictions))


def r2_score(labels: np.ndarray, predictions: np.ndarray) -> float:
    """returns the coefficient of determination

    Parameters
    ----------

    Returns
    -------
    out: float
    """
    mean = np.mean(labels)
    ss_res = np.sum((labels - predictions) ** 2)
    ss_tot = np.sum((labels - mean) ** 2)
    return 1 - (ss_res / ss_tot)


def log_loss(labels: np.ndarray, predictions: np.ndarray) -> float:
    """returns the log loss error

    Parameters
    ----------

    Returns
    -------
    out: float
    """
    labels = np.tile(labels, (len(labels), 1))
    return np.sum(np.log(predictions) @ labels)


> Above are the right way to define metrics, but since we need something simple for today's tutorial, we will define simpler metrics below


In [3]:
# simplified for exercise
def mean_absolute_error(error: np.ndarray) -> float:
    """returns the sum of squared errors

    Parameters
    ----------

    Returns
    -------
    out: float

    """
    m = len(error)
    return np.sum(np.abs(error)) / m


def mean_squared_error(error: np.ndarray) -> float:
    """returns the mean squared error

    Parameters
    ----------

    Returns
    -------
    out: float

    """
    m = len(error)
    return np.sum(error**2) / (2 * m)


def root_mean_squared_error(error: np.ndarray) -> float:
    """returns the root mean squared error

    Parameters
    ----------

    Returns
    -------
    out: float
    """
    return np.sqrt(mean_squared_error(error))


### Activation functions

We remember that neurons have the useful property of including non-linearity in 
calculations that allow neural networks to capture more complex features that
other methods might not be able to capture.

Here we define the most common activation functions


In [4]:
# activation functions
def relu(z: float | np.ndarray) -> float | np.ndarray:
    """Rectified Linear Unit

    Parameters
    ----------

    Returns
    -------
    out: float, np.ndarray
    """
    if type(z) == float:
        return max(0, z)
    zeros = np.zeros(z.shape)
    return np.max(np.c_[z, zeros], axis=1)


def leaky_relu(z: float | np.ndarray, slope: float = 0.1) -> np.ndarray:
    """Leaky Rectified Linear Unit

    Parameters
    ----------

    Returns
    -------
    out: np.ndarray
    """
    return np.max(np.c_[z, slope * z], axis=1)


def sigmoid(z: float | list | np.ndarray) -> float | np.ndarray:
    """Sigmoid, the logistic function

    Parameters
    ----------

    Returns
    -------
    out: float, np.ndarray
    """
    return 1 / (1 + np.exp(-z))


def softmax(z: np.ndarray) -> np.ndarray:
    """Softmax

    Parameters
    ----------

    Returns
    -------
    out: np.ndarray
    """
    return np.exp(z) / np.sum(np.exp(z))


# NOTE: tanh could be provided by numpy through np.tanh


### Derivatives

Since we are going to need gradients and chain rule, it is best to define the
derivatives of the activation function and the loss-metric

---

We start w/ derivatives of activation functions


In [5]:
# derivative of activation functions
class UndefinedDerivative(Exception):
    """Exception for when a function has no defined derivative at a certain
    point"""

    pass


def relu_prime(z: float | np.ndarray) -> float | np.ndarray:
    """

    Parameters
    ----------

    Returns
    -------
    out: float, np.ndarray
    """
    if type(z) == float:
        if z == 0:
            raise UndefinedDerivative("Undefined derivative")
        return 0 if z < 0 else 1
    if np.any(z == 0):
        raise UndefinedDerivative("Undefined derivative")
    return (z > 0).astype(np.int64)


def leaky_relu_prime(z: float | np.ndarray, slope: float) -> float | np.ndarray:
    """

    Parameters
    ----------

    Returns
    -------
    out: float, np.ndarray
    """
    if type(z) == float:
        if z == 0:
            raise UndefinedDerivative("Undefined derivative")
        return slope if z < 0 else 1
    if np.any(z == 0):
        raise UndefinedDerivative("Undefined derivative")
    return ((z < 0) * (slope - 1)) + 1


def sigmoid_prime(z: float | np.ndarray) -> float | np.ndarray:
    """

    Parameters
    ----------

    Returns
    -------
    out: float, np.ndarray
    """
    sigma = sigmoid(z)
    return sigma * (1 - sigma)


def softmax_prime(z: np.ndarray) -> np.ndarray:
    """

    Parameters
    ----------

    Returns
    -------
    out: np.ndarray
    """
    e_z = np.exp(z)
    total = np.sum(e_z)
    return (e_z * (total - e_z)) / (total**2)


def tanh_prime(z: float | np.ndarray) -> float | np.ndarray:
    """

    Parameters
    ----------

    Returns
    -------
    out: float, np.ndarray
    """
    return 1 - (np.tanh(z) ** 2)


> For implementation purposes, let's define a mapping between activations & their derivatives


In [6]:
ACTIVATION_PRIMES = {
    relu: relu_prime,
    leaky_relu: leaky_relu_prime,
    sigmoid: sigmoid_prime,
    softmax: softmax_prime,
    np.tanh: tanh_prime,
}


Now we define derivative for loss-metrics

> the o/p of these derivatives is what was referred to in the Udacity classroom as **error term**, not the complete derivative


In [18]:
# derivative of cost simplified cost functions
def mean_absolute_error_prime(
    error=None,
    activation_prime: float | Callable | np.ndarray = 1.0,
    weighted_features: np.ndarray = None,
) -> np.ndarray:
    """ """
    m = 1
    if error:
        m = len(error)
    if callable(activation_prime):
        activation_prime = activation_prime(weighted_features)
    return -activation_prime / m


def mean_squared_error_prime(
    error: np.ndarray,
    activation_prime: float | Callable | np.ndarray = 1.0,
    weighted_features: np.ndarray = None,
) -> np.ndarray:
    """ """
    # m = len(error)
    m = len(error) if isinstance(error, np.ndarray) else 1
    if callable(activation_prime):
        activation_prime = activation_prime(weighted_features)
    return (error * activation_prime) / m


This here are some random data & random weights for the simplest architecture, an
i/p layer, a 1-neuron o/p layer

> this is just for illustration purposes, the same way the classroom started simple, then drove more complex.

> Could be a good exercise to first generalise for multi-output, then generalise for `DNN`s


In [19]:
# learning rate
lr = 0.1
# features
x = np.array(list(range(1, 5)))
# labels
y = np.array((0.5))
# initial weights
w = np.array([0.5, -0.5, 0.3, 0.1])
# TODO: try to update the code to define random weights if not provided


Now we will define our gradient descent, note that while we are deriving the GD
in classes, it is still the very same gradient descent code from the classroom.


In [20]:
# A good OOP practice to define interfaces/abstract classes
class AbstractGradientDescent:
    def __init__(self, *args, **kwargs) -> AbstractGradientDescent:
        pass

    def backward(self, error: np.ndarray) -> None:
        pass

    def forward(self, features: np.ndarray) -> np.ndarray:
        pass

    def step(self) -> None:
        pass

    def zero_grad(self) -> None:
        pass


In [21]:
# The Vanilla gradient descent we all know and love
class GradientDescent(AbstractGradientDescent):
    def __init__(self, *args, **kwargs) -> GradientDescent:
        # learning rate
        self._lr = kwargs.get("learning_rate", 0.01)
        self._w = kwargs.get("weights")
        self._dw = np.zeros(self._w.shape)
        # in case no activation is set, act like no non-linearity included
        # NOTE: when expanding the logic for MLP/DNN, this should be per-layer
        #   configuration
        self._activation = kwargs.get("activation", lambda x: x)
        # loss metric
        self._metric_prime = kwargs.get("loss_fn", mean_squared_error_prime)
        # derivative of activation could be passed as an argument, or extracted
        #   from the global dictionary defined
        self._activation_prime = kwargs.get(
            "activation_prime",
            ACTIVATION_PRIMES.get(self._activation, lambda x: x),
        )
        # define elements needed for zero_grad, and step functions
        self._hidden = None
        self._x = None

    def backward(self, error: np.ndarray) -> None:
        # calculate the gradient of the o/p wrt the weights
        # self._dw = error * self._activation_prime(self._hidden)
        self._dw = self._metric_prime(error, self._activation, self._hidden)

    def forward(self, features: np.ndarray) -> np.ndarray:
        # keep the input features for the complete gradient calculation
        self._x = features
        # keep the hidden layer o/p for use with gradient
        self._hidden = features @ self._w
        # the forward step, activation on hidden layer o/p
        return self._activation(self._hidden)

    def step(self) -> None:
        # update the weights
        self._w += self._lr * self._dw * self._x

    def zero_grad(self) -> None:
        # clear the gradient calculated before
        self._dw = np.zeros(self._w.shape)
        if self._hidden is not None:
            self._hidden = np.zeros(self._hidden.shape)
        self._x = None


In [24]:
optimiser = GradientDescent(learning_rate=.1, weights=w.copy(), activation=sigmoid)
epochs = 10

for _ in range(epochs):
    optimiser.zero_grad()
    p = optimiser.forward(x)
    e = y - p
    optimiser.backward(e)
    print(p, e)
    optimiser.step()


0.6899744811276125 -0.1899744811276125
0.6003125016027219 -0.10031250160272187
0.5562880935734273 -0.056288093573427345
0.5329953867405707 -0.032995386740570676
0.5198431889101734 -0.019843188910173448
0.5121147349894629 -0.012114734989462916
0.5074634514314771 -0.007463451431477086
0.5046233024344993 -0.004623302434499266
0.5028736329575841 -0.002873632957584138
0.5017898512687841 -0.0017898512687840595


## AdaGrad

There are ways to improve how quick your model should learn amongst those are:

- Adaptive Learning Rate, and
- Momentum-based Gradient Descent (later on that)

The adaptive learning rate is simply a learning rate that changes according to
some conditions.

For `AdaGrad` (Adaptive Gradient), the condition is that the sparse features,
which lead to slow learning rate, should have bigger learning rate than dense
features.

For more in detail on AdaGrad, check this [article](https://medium.com/konvergen/an-introduction-to-adagrad-f130ae871827)

Eventually, that leads to usage of a slightly different update step

$$
W^{\prime} = W + \frac{\alpha}{\sqrt{\upsilon_t}+\varepsilon}\nabla{W}
\\
\upsilon_t=\upsilon_{t-1} + (\nabla{W})^2; \upsilon_0=0
$$

where &alpha; is the initial learning rate defined by us,
&epsilon; is a small number ($10^{-8}$) to avoid division by $0$


In [None]:
class AdaGrad(GradientDescent):
    pass


Let's put that classifier to the test


In [None]:
optimiser = AdaGrad(learning_rate=10e-3, weights=w.copy(), activation=sigmoid)

for _ in range(epochs):
    optimiser.zero_grad()
    p = optimiser.forward(x)
    e = y - p
    optimiser.backward(e)
    print(p, e)
    optimiser.step()


## RMSProp

AdaGrad helped w/ the sparse features and their learning rate, but that led to
some unintentional issue, the **vanishing gradient**, which is the gradient
arrives at some point to $0$, so the weights stop updating afterwards, and that
might happen before the minima.

For `RMSProp` (Root Mean Square Propagation), we find a way to mitigate the
effect of decaying learning rate, by throttling the rapid growth of
$\upsilon_t$. That is done through the concept of
_Exponentially Weight Moving Average_ [EWMA](https://medium.com/mlearning-ai/exponentially-weighted-average-5eed00181a09)

More details about RMSProp are in this [article](https://medium.com/optimization-algorithms-for-deep-neural-networks/rmsprop-fb992098fa6e)

RMSProp keeps the same update equation, but alters the $\upsilon_t$ equation

$$
\upsilon_t = \beta\upsilon_{t-1} + (1-\beta)(\nabla{W})^2
$$


In [None]:
class RMSProp(AdaGrad):
    pass


Time to put this optimiser to the test


In [None]:
optimiser = RMSProp(learning_rate=5e-4, weights=w.copy(), activation=sigmoid)

for _ in range(epochs):
    optimiser.zero_grad()
    p = optimiser.forward(x)
    e = y - p
    optimiser.backward(e)
    print(p, e)
    optimiser.step()


## AdaM

As mentioned before, there is a method that allows for improving learning by
leveraging _momentum_. It is similar to the momentum from mechanics.

In simple words, as long as you are sliding down a hill, you gain momentum, once
you reach a slightly less steep, or even uphill, this momentum drops.

This might be helpful to avoid getting stuck at a local minimum.

Now we have another way to help with optimisation, which is mixing both momentum
& adaptive learning rate

More info on AdaM in this [article](https://medium.com/analytics-vidhya/a-complete-guide-to-adam-and-rmsprop-optimizer-75f4502d83be)

The new equations are quite different, 1st the weights are now updated by the
means of the momentum, which in turn is based off the gradient, and still using
the adaptive learning rate.

$$
m_t=\beta_1\upsilon_{t-1}+(1-\beta_1)(\nabla{W})
\\
\upsilon_t=\beta_2\upsilon_{t-1}+(1-\beta_2)(\nabla{W})^2
\\
W^{\prime}=W + \frac{\alpha}{\sqrt{\upsilon_t}+\varepsilon}m_t
$$

In [None]:
class AdaM(RMSProp):
    pass


One final test, the adam optimiser


In [None]:
optimiser = AdaM(learning_rate=.01, weights=w.copy(), activation=sigmoid)

for _ in range(epochs):
    optimiser.zero_grad()
    p = optimiser.forward(x)
    e = y - p
    optimiser.backward(e)
    print(p, e)
    optimiser.step()
