This notebook is used for simple one and 2 variable examples and visualizations of gradient descent, momentum and linear regression using gradient descent

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import math

import os

if not os.path.isdir("visualizations"):
    os.mkdir("visualizations")

In [None]:
%load_ext autoreload
%autoreload 2
%matplotlib inline

Show a simple gradient descent algorithm for the function
$$y=x^2$$

The actual minimum is $x=0$

Sources:
- https://nbviewer.jupyter.org/url/courses.d2l.ai/berkeley-stat-157/slides/4_30/gd-sgd.ipynb

In [None]:
def gd(lr, initial_x, epochs):
    x = initial_x
    x_values = [x]
    for i in range(epochs):
        x = x - lr * 2 * x # derivative of x^2
        x_values.append(x)
    return x_values

In [None]:
def show_trace(ax, x_values, func):
    n = max(abs(min(x_values)), abs(max(x_values)))
    f = np.arange(-n, n, 0.1)
    ax.plot(f, [func(x) for x in f]) # the actual function
    ax.plot(x_values, [func(x) for x in x_values], "-o", color="red") # the values calculated for x after every gd epoch
    ax.grid()
    ax.set_xlabel("x")
    ax.set_ylabel("y")

In [None]:
initial_x = -10
epochs = 10
func = lambda x: x**2
learning_rates = [0.01, 0.2, 0.8, 0.99]
infos = ["too small", "good", "too large", "way too large"]
fig, axs = plt.subplots(2, 2, figsize=(10, 7), gridspec_kw=dict(hspace=0.35))
fig.suptitle(r"Gradient Descent behaviour for $y=x^2$"f" and different learning rates ({epochs} epochs)")
i = 0
k = 0
for idx, lr in enumerate(learning_rates):
    ax = axs[i, k]
    ax.set_title(f"lr={lr} ({infos[idx]})")
    x_values = gd(lr, initial_x=initial_x, epochs=epochs)
    show_trace(ax=ax, x_values=x_values, func=func)
    k = (k + 1) % 2
    if k == 0:
        i += 1

plt.savefig("visualizations/gradient_descent_1d_example.png")

Show the difference between gradient descent and stochastic gradient descent for the multivariate function
$$y=x_1^2+2x_2^2$$

The actual minimum is $x_1=0$ and $x_2=0$

In [None]:
def gd_2d(lr, initial_point, epochs):
    p = initial_point
    results = [p]
    for i in range(epochs):
        x1 = p[0] - lr * 2 * p[0] # partial derivative for x1
        x2 = p[1] - lr * 4 * p[1] # partial derivate for x2
        p = (x1, x2)
        results.append(p)
    return results

In [None]:
def show_trace_2d(ax, points, func, x1lim=(-5.5, 1.0), x2lim=(-2.5, 0.5)):
    x1, x2 = np.arange(x1lim[0], x1lim[1], 0.1), np.arange(x2lim[0], x2lim[1], 0.1)
    X1, X2 = np.meshgrid(x1, x2)
    Z = func(X1, X2)
    labels = ax.contour(X1, X2, Z, 40)
    ax.autoscale(False)
    for p in points:
        ax.plot([p[0] for p in points], [p[1] for p in points], "-o", color="red")
    ax.set_xlabel("x1")
    ax.set_ylabel("x2")
    # ax.clabel(labels, inline=1, fontsize=13)

In [None]:
initial_point = (-5, -2)
epochs = 20
learning_rates = [0.01, 0.1, 0.5, 0.9]
infos = ["too small", "good", "too large", "way too large"]
fig, axs = plt.subplots(2, 2, figsize=(10, 7), gridspec_kw=dict(hspace=0.35))
fig.suptitle(r"Gradient Descent behaviour for $y=x_1^2+2x_2^2$"f" and different learning rates ({epochs} epochs)")
func = lambda X1, X2: X1**2 + 2*X2**2
i = 0
k = 0
for idx, lr in enumerate(learning_rates):
    ax = axs[i, k]
    ax.set_title(f"lr={lr} ({infos[idx]})")
    points = gd_2d(lr, initial_point=initial_point, epochs=epochs)
    show_trace_2d(axs[i, k], points, func)
    k = (k + 1) % 2
    if k == 0:
        i += 1

plt.savefig("visualizations/gradient_descent_2d_example.png")

The same thing for momentum

Sources:
- https://nbviewer.jupyter.org/url/courses.d2l.ai/berkeley-stat-157/slides/5_2/momentum.ipynb

In [None]:
def momentum_2d(lr, gamma, initial_point, epochs):
    prev_x1, prev_x2 = 0, 0
    p = initial_point
    results = [p]
    for i in range(epochs):
        prev_x1 = gamma * prev_x1 + lr * 2 * p[0] # partial derivative for x1
        prev_x2 = gamma * prev_x2 + lr * 4 * p[1] # partial derivative for x2
        x1 = p[0] - prev_x1
        x2 = p[1] - prev_x2
        p = (x1, x2)
        results.append(p)
    return results

In [None]:
initial_point = (-5, -2)
epochs = 20
learning_rates = [0.01, 0.1, 0.5, 0.9]
gammas = [0.4, 0.5, 0.9, 0.99]
fig, axs = plt.subplots(4, 4, figsize=(25, 14), gridspec_kw=dict(hspace=0.4))
fig.suptitle("Gradient Descent with momentum\n"r"behaviour for $y=x_1^2+2x_2^2$"f" and different learning rates ({epochs} epochs)")
func = lambda X1, X2: X1**2 + 2*X2**2
i = 0
k = 0
for idx, lr in enumerate(learning_rates):
    for kdx, gamma in enumerate(gammas):
        ax = axs[i, k]
        ax.set_title(f"lr={lr}" r"$\gamma$"f"={gamma}")
        points = momentum_2d(lr, gamma=gamma, initial_point=initial_point, epochs=epochs)
        show_trace_2d(axs[i, k], points, func, x1lim=(-5.5, 3.0), x2lim=(-2.5, 3.0))
        k = (k + 1) % 4
        if k == 0:
            i += 1

plt.savefig("visualizations/momentum_2d_example.png")

Using Gradient Descent for Linear Regression

Sources:
- https://towardsdatascience.com/linear-regression-using-gradient-descent-97a6c8700931

In [None]:
# generate training data
np.random.seed(666)
truth = lambda x: 1.5*x - 20
amount = 250
X = range(0, amount)
target_line = [truth(val) for val in X]
training_data = np.random.normal(loc=target_line, scale=amount/3, size=(1, amount))
training_data = training_data[0, :]
plt.scatter(X, training_data)
plt.plot(target_line, color="lime", linewidth=4)
plt.title(r"training data around truth function $y=\dfrac{3}{2}x-20$")
plt.xlabel("x")
plt.ylabel("y")

In [None]:
# model y = m * x + c
regression_model = lambda m, c, x: m * x + c

# data loader that enables either linear or shuffled loading
def build_data_loader(training_data, params={"type": "linear"}):
    current_idx = 0
    total = len(training_data)
    served_amount = 0
    
    def linear():
        for idx, val in enumerate(training_data):
            yield idx, val
    
    shuffled_copy = None
    indices = None
    def shuffled():
        nonlocal shuffled_copy
        if shuffled_copy == None:
            shuffled_copy = list(enumerate(np.copy(training_data)))
            np.random.shuffle(shuffled_copy)
        for idx, val in shuffled_copy:
            yield idx, val
    
    def mini_batch(batch_size):
        nonlocal shuffled_copy, current_idx, total, served_amount
        if shuffled_copy == None:
            shuffled_copy = list(enumerate(np.copy(training_data)))
            np.random.shuffle(shuffled_copy)
        
        while served_amount < total:
            batch = shuffled_copy[current_idx:current_idx + batch_size]
            current_idx += batch_size
            served_amount += batch_size
            yield batch
        
    
    data_loader = None
    if params["type"] == "linear":
        data_loader = {"type": "linear", "loader": linear}
    elif params["type"] == "shuffled":
        data_loader = {"type": "shuffled", "loader": shuffled}
    elif params["type"] == "mini-batch" and "batch-size" in params:
        data_loader = {"type": "mini-batch", "loader": lambda x=0: mini_batch(params["batch-size"])}
        
    data_loader["get_resetted_data_loader"] = lambda x=0: build_data_loader(training_data, params)
    
    return data_loader
    
# loss function: mean squared error (mse)
def mse(training_data, predictions):
    squared_error_sum = 0
    for idx, prediction in enumerate(predictions):
        diff = training_data[idx] - prediction # precalculated prediction = m * idx + c
        squared_error_sum +=  diff * diff
    mean = squared_error_sum / len(predictions)
    return mean

def partial_derivative_mse_m(training_data, predictions):
    error_sum = 0
    for idx, prediction in enumerate(predictions):
        diff = training_data[idx] - prediction
        error_sum += idx * diff
    return -2 / len(predictions) * error_sum

def partial_derivative_mse_c(training_data, predictions):
    error_sum = 0
    for idx, prediction in enumerate(predictions):
        diff = training_data[idx] - prediction
        error_sum += diff
    return -2 / len(predictions) * error_sum

def calc_predictions_and_loss(m, c, X, training_data):
    predictions = [regression_model(m, c, x) for x in X]
    return predictions, mse(training_data, predictions)

def calc_prediction_and_loss(m, c, x, example):
    prediction = regression_model(m, c, x)
    return prediction, mse([example], [prediction])

# training
def train_regression_model(epochs, data_loader=None, amount=-1, lr=0.0001, initial_m=0, initial_c=0):
    m, c = initial_m, initial_c
    loss_per_epoch = []
    for i in range(epochs):
        if data_loader == None: # calculate loss and predictions over all examples and do only one update of m and c.
            predictions, loss = calc_predictions_and_loss(m, c, X, training_data)
            loss_per_epoch.append(loss)
            m = m - lr * partial_derivative_mse_m(training_data, predictions)
            c = c - lr * partial_derivative_mse_c(training_data, predictions)
        elif data_loader["type"] == "mini-batch": # calc loss and predictions over a batch of examples and do one update of m and c per batch.
            if i > 0:
                data_loader = data_loader["get_resetted_data_loader"]()
            
            observed_examples = 0
            running_loss = 0
            for batch in data_loader["loader"]():
                indeces, examples = zip(*batch)
                predictions, loss = calc_predictions_and_loss(m, c, indeces, examples)
                running_loss += loss
                m = m - lr * partial_derivative_mse_m(examples, predictions)
                c = c - lr * partial_derivative_mse_c(examples, predictions)
            
            observed_examples += len(batch)
            if observed_examples > 0:
                loss_per_epoch.append(running_loss / observed_examples)
        else: # calculate loss and prediction for one example and directly do an update of m and c. One update for each example.
            if i > 0:
                data_loader = data_loader["get_resetted_data_loader"]()
            
            observed_examples = 0
            running_loss = 0
            for x, example in data_loader["loader"]():
                if amount != -1:
                    if observed_examples >= amount:
                        continue
                
                prediction, loss = calc_prediction_and_loss(m, c, x, example)
                running_loss += loss
                m = m - lr * partial_derivative_mse_m([example], [prediction])
                c = c - lr * partial_derivative_mse_c([example], [prediction])
                
                observed_examples += 1
                
            if observed_examples > 0:
                loss_per_epoch.append(running_loss / observed_examples)
    return m, c, loss_per_epoch

In [None]:
# test shuffled data loader
dl_shuffled = build_data_loader(training_data, params=dict(type="shuffled"))
for idx, val in dl_shuffled["loader"]():
    actual = training_data[idx]
    if actual != val:
        print(f"  ERROR: value at index {idx} is {val} but should be {actual}")

# test mini-batch data loader
dl_mini_batch = build_data_loader(training_data, params={"type": "mini-batch", "batch-size": 25})
for batch in dl_mini_batch["loader"]():
    for idx, val in batch:
        actual = training_data[idx]
        if actual != val:
            print(f"  ERROR: value at index {idx} is {val} but should be {actual}")

In [None]:
fig, axs = plt.subplots(1, 2, figsize=(20, 5), gridspec_kw=dict(hspace=0.35))
left = axs[0]
right = axs[1]
fig.suptitle(f"Result of training the regression model for {epochs} epochs", y=1.15)

# You can play around with different epochs, learning rates, and data-loaders with different batch-sizes
epochs = 300
lr = 0.0015
batch_size = 5

# using no data loader performs one update per epoch
# data_loader = None

# linear data loader performs one parameter update per training example
# data_loader = build_data_loader(training_data, params={"type": "linear"})

# shuffled data loader also performs one parameter update per training example but the examples get shuffled randomly every epoch
# data_loader = build_data_loader(training_data, params={"type": "shuffled"})

# mini-batch data loader performs one update per batch of training examples and the examples get shuffled randomly every epoch
data_loader = build_data_loader(training_data, params={"type": "mini-batch", "batch-size": batch_size})

m, c, loss_per_epoch = train_regression_model(epochs=epochs, lr=lr, data_loader=data_loader)

predictions, _ = calc_predictions_and_loss(m, c, X, training_data)
left.scatter(X, training_data)
left.plot(target_line, color="lime", linewidth=4)
left.plot(predictions, color="red", linewidth=4)
left.set_title(r"training data around truth function $y=\dfrac{3}{2}x-20$" + "\n"
               + r"and learned function $y=mx+c$" + "\nwith m = "
               + f"{round(m, 4)} c = {round(c, 4)}")
left.legend(["truth line", "predicted line", "training data"])
left.set_xlabel("x")
left.set_ylabel("y")

right.plot(range(0, len(loss_per_epoch)), loss_per_epoch)
right.set_title(f"loss from Mean Squared Error\nwith latest loss: {round(loss_per_epoch[-1], 4)}")
right.set_xlabel("epoch")
right.set_ylabel("loss")

# plt.savefig("./visualizations/gradient_descent_linear_regression_example.png", bbox_inches="tight")