[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/UNF-AIRO/ml-quickstart/blob/main/exercises/01_Function_Fitting_Solution.ipynb)

In [None]:
%pip install matplotlib numpy torch
%matplotlib notebook

In [2]:
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import numpy as np

plt.rcParams["animation.html"] = "jshtml"

# Function Fitting

Many machine learning workflows follow these steps:
- Collect and clean a dataset of `(input, output)` pairs
- Define a model - a function mapping data from input to output format
- Repeatedly:
  - Sample `(input, output)` from the dataset
  - Pass `input` to the model and get `model_output`
  - Automatically adjust internal model parameters so `model_output` would be
  closer to `output` if it were given a similar `input` in the future

As an example, let's generate a dataset of I/O pairs from the function
$f(x) = 3x^2 - 4x + 1$.

In [3]:
np.random.seed(0)

dataset_x = np.random.randn(20)
dataset_y = 3 * dataset_x**2 - 4 * dataset_x + 1

plt.scatter(dataset_x, dataset_y)
plt.show()

<IPython.core.display.Javascript object>

Assume that we're given this dataset and know that it comes from a quadratic
function, but don't know the parameters. Let's assume they're somewhere between
`-5` and `5`, so give the model some initial parameters in that range (but not
the known correct ones).

In [4]:
class Model:
    def __init__(self):
        self.a = 1
        self.b = 1
        self.c = 1

    def predict(self, x):
        return self.a * x**2 + self.b * x + self.c


model = Model()

Let's compare the initial model to the dataset

In [5]:
def plot_predictions():
    initial_pred_y = model.predict(dataset_x)

    # Show absolute error for each prediction
    for i in range(len(dataset_x)):
        plt.plot(
            [dataset_x[i], dataset_x[i]],
            [dataset_y[i], initial_pred_y[i]],
            color="red",
            alpha=0.2,
            zorder=-1,
        )

    plt.scatter(dataset_x, dataset_y, label="Dataset")
    plt.scatter(dataset_x, initial_pred_y, label="Initial Prediction")
    plt.legend()
    plt.show()


plot_predictions()

<IPython.core.display.Javascript object>

In order to adjust the model to have better predictions, we need a measure of
how good (or how bad) they currently are. We'll define the commonly used Mean
Squared Error function and check the current value.

In [6]:
def get_mse(y_true, y_pred):
    return np.mean((y_true - y_pred)**2)

print(f"Initial MSE: {get_mse(dataset_y,  model.predict(dataset_x))}")

Initial MSE: 7.57742172161206


The following code will help with visualizing model behavior during training:

In [7]:
def show_anim(anim_preds):
    fig, ax = plt.subplots()
    ax.scatter(dataset_x, dataset_y, label="Dataset")
    pred_scatter = ax.scatter([], [], label="Predictions")
    ax.legend()


    def animate(i):
        pred_y = anim_preds[i]
        pred_scatter.set_offsets(np.stack([dataset_x, pred_y]).T)
        return (pred_scatter,)


    return FuncAnimation(fig, animate, frames=len(anim_preds), interval=30)

One strategy we can take to decrease error is to randomly adjust model
parameters, evaluate the error again, and then revert the changes if they made
it worse.

Inside the training loop, try adding a small random value (~0.1) to each
parameter. Make sure the random distribution you choose has negative values.

In [8]:
anim_preds = []

# Training loop
for i in range(400):
    old_params = [model.a, model.b, model.c]
    old_pred_y = model.predict(dataset_x)
    old_error = get_mse(dataset_y, old_pred_y)

    if i % 40 == 0:
        print(f"Iteration {i}: error = {old_error:.4f}")

    # Test new parameters
    model.a += np.random.normal(0, 0.1)
    model.b += np.random.normal(0, 0.1)
    model.c += np.random.normal(0, 0.1)

    new_pred_y = model.predict(dataset_x)
    if i % 10 == 0:
        anim_preds.append(new_pred_y)

    new_error = get_mse(dataset_y, new_pred_y)

    # Revert change to model parameters if new error is worse
    if new_error > old_error:
        model.a, model.b, model.c = old_params


show_anim(anim_preds)

Iteration 0: error = 7.5774
Iteration 40: error = 4.1979
Iteration 80: error = 2.1389
Iteration 120: error = 0.8448
Iteration 160: error = 0.3696
Iteration 200: error = 0.0681
Iteration 240: error = 0.0140
Iteration 280: error = 0.0001
Iteration 320: error = 0.0001
Iteration 360: error = 0.0001


<IPython.core.display.Javascript object>

This works pretty well for models with only a few parameters, but the
probability of random parameter changes significantly improving a model is
generally lower for models with more parameters.

Say that at some point in training we can best reduce error by decreasing the
first parameter, increasing the second, and decreasing the third. For models
with many parameters, the probability of randomly choosing the "correct"
direction for all of the parameters will be very low.

Since this model and error function are differentiable, we can find the
direction of change for the model's parameters that would most significantly
decrease error if these dataset samples are passed through the model again.

In practice, the following calculations will be done automatically for you by
libraries like PyTorch, but it might help to have some idea of what it's doing.

Model: $f(x) = ax^2 + bx + c$

Error function for one sample: $g(y_t, y_p) = (y_t - y_p)^2$

For dataset samples $x, y$, we want to minimize $g(y, f(x))$.

$E = g(y, f(x)) = g(y, ax^2 + bx + c) = (y - (ax^2 + bx + c))^2$

$= c^2 + 2bcx + b^2 x^2 + 2acx^2 + 2abx^3 + a^2 x^4 - 2cy - 2bxy - 2ax^2 y + y^2$

$\frac{\partial E}{\partial a} = 2x^2 (c + bx + ax^2 - y)$

$\frac{\partial E}{\partial b} = 2x (c + bx + ax^2 - y)$

$\frac{\partial E}{\partial c} = 2 (c + bx + ax^2 - y)$

We can now define a function to get the local best direction to move in
parameter space.

In [9]:
def gradient(model: Model, x, y_true):
    dda = 2 * x**2 * (model.c + model.b * x + model.a * x**2 - y_true)
    ddb = 2 * x * (model.c + model.b * x + model.a * x**2 - y_true)
    ddc = 2 * (model.c + model.b * x + model.a * x**2 - y_true)

    # x and y_true are arrays of multiple samples - take the aveage for
    # each parameter
    grad = np.array([dda.mean(), ddb.mean(), ddc.mean()])
    return grad

Now we can see what training looks like if we always step in the local best
direction.

In [10]:
# Re-initialize model and animation
model = Model()
anim_preds = []

# Training loop
for i in range(400):
    grad = gradient(model, dataset_x, dataset_y)
    model.a -= grad[0] * 0.05
    model.b -= grad[1] * 0.05
    model.c -= grad[2] * 0.05

    pred_y = model.predict(dataset_x)
    error = get_mse(dataset_y, pred_y)
    if i % 40 == 0:
        print(f"Iteration {i}: error = {error:.4f}")

    if i % 10 == 0:
        anim_preds.append(pred_y)


show_anim(anim_preds)

Iteration 0: error = 6.2713
Iteration 40: error = 1.0566
Iteration 80: error = 0.1950
Iteration 120: error = 0.0360
Iteration 160: error = 0.0066
Iteration 200: error = 0.0012
Iteration 240: error = 0.0002
Iteration 280: error = 0.0000
Iteration 320: error = 0.0000
Iteration 360: error = 0.0000


<IPython.core.display.Javascript object>

PyTorch provides a relatively simple and flexible interface to define many
different types of machine learning models in a way that allows it to
automatically handle gradient calculation and training updates. Here's an
example of how PyTorch can simplify training:

In [11]:
import torch
from torch import nn


class ModelV2(nn.Module):
    def __init__(self):
        super().__init__()
        # Initialxie parameters and let torch know that it should include
        # them in .parameters()
        # Make sure they have float datatype
        self.a = nn.Parameter(torch.tensor([1.0]))
        self.b = nn.Parameter(torch.tensor([1.0]))
        self.c = nn.Parameter(torch.tensor([1.0]))

    def forward(self, x):
        return self.a * x**2 + self.b * x + self.c


model_2 = ModelV2()
loss_fn = nn.MSELoss()

# Choose a strategy for using the gradient and a step size
optimizer = torch.optim.SGD(model_2.parameters(), lr=0.05)

In [12]:
# Re-initialize animation
anim_preds = []

# Convert dataset format so torch can track gradients
dataset_x_tensor = torch.tensor(dataset_x)
dataset_y_tensor = torch.tensor(dataset_y)

# Training loop
for i in range(400):
    pred_y = model_2(dataset_x_tensor)
    loss = loss_fn(pred_y, dataset_y_tensor)
    # Reset gradient accumulated in previous step
    model_2.zero_grad()
    # Accumulate gradient for this step
    loss.backward()
    # Update model parameters
    optimizer.step()

    if i % 40 == 0:
        print(f"Iteration {i}: error = {loss.item():.4f}")

    if i % 10 == 0:
        anim_preds.append(pred_y.detach().numpy())


show_anim(anim_preds)

Iteration 0: error = 7.5774
Iteration 40: error = 1.1022
Iteration 80: error = 0.2034
Iteration 120: error = 0.0375
Iteration 160: error = 0.0069
Iteration 200: error = 0.0013
Iteration 240: error = 0.0002
Iteration 280: error = 0.0000
Iteration 320: error = 0.0000
Iteration 360: error = 0.0000


<IPython.core.display.Javascript object>