In [None]:
####################################################################
# Machine Learning Primer - Workshop
# Day 3 - September 2021
####################################################################

# python package imports
from __future__ import annotations
from typing import Callable, List, Optional, Tuple
from functools import partial

import numpy as np
import tensorflow as tf
import plotly.graph_objects as go
from plotly.subplots import make_subplots

In [None]:
####################################################################
# Functions to generate (fake) data that we would use for neural
# network regression
####################################################################

def gt_function(x: float | np.ndarray, true_params: np.ndarray):
    return (true_params[0] * x) + (true_params[1] * x) + (true_params[2])

def generate_fake_single_var_data(
    n_points: int,
    min_x: float,
    max_x: float,
    true_params: np.ndarray,
    noise_std: float,
) -> Tuple[np.ndarray, np.ndarray]:
    """Generate fake noisy data from a hypothetical line
    
    Args:
        n_points: number of sample data points (x, y) to generate.
        true_m: the gradient to use for the hypothetical line.
        true_c: the intercept to use for the hypothetical line.
        noise_std: noise standard-deviation to add to generate y for
            each data point.
    
    Returns:
        x: Vector of size (n_points), giving each sample's x value.
        noisy_y: Vector of size (n_points), giving each sample's y value.
    """
    # generate the underlying x points
    x = np.random.uniform(low=min_x, high=max_x, size=n_points)
    # apply the line equation to get the ground-truth y for each x
    y = gt_function(x, true_params)
    # add noise to y
    noise = np.random.normal(scale=noise_std, size=n_points)
    noisy_y = y + noise
    return x, noisy_y

In [None]:
####################################################################
# Data plotting functionality
####################################################################

def get_hypothesis_scatter_plots(
    data_x: np.ndarray,
    data_y: np.ndarray,
    predictor: Optional[Callable] = None,
) -> List[go.Scatter]:
    """Gets the scatter plots to draw the data, true line, and hypothesis
    line if parameters given.
    
    Args:
        data_x: Vector of x values
        data_y: Vector of corresponding y values
        hypothesis_m: The m (slope) hypothesis. If given, will be used
            to draw the hypothesis line.
        hypothesis_c: The c (intercept) hypothesis. If given, will be used
            to draw the hypothesis line.
    
    Returns:
        List of scatter plots for drawing data, true line, and hypothesis.
    """
    x_range = np.linspace(min_x, max_x, 100)
    scatter_plots = [
        # plot the data
        go.Scatter(
            x=data_x,
            y=data_y,
            mode="markers",
            marker_size=10,
            marker_color='blue',
            name="data"
        ),
        # plot the underlying GT line
        go.Scatter(
            x=x_range,
            y=gt_function(x_range, true_params),
            mode='lines',
            line_dash='dash',
            line_color='green',
            name='ground-truth'
        )
    ]
    if predictor is not None:
        # plot the current hypothesis
        scatter_plots.append(
            go.Scatter(
                x=x_range,
                y=predictor(x_range),
                mode='markers',
                marker_size=10,
                line_color='red',
                name='predictions'
            )
        )
    return scatter_plots


def plot_gradient_descent_info(
    data_x: np.ndarray,
    data_y: np.ndarray,
    predictor: Callable,
    loss_history: np.array,
):
    """Draws data/hypothesis; cost contour map; and cost vs training iterations.
        
    Args:
        data_x: Vector of x values
        data_y: Vector of corresponding y values
        m_history: Vector of m (slope) values as training progresses
            (from the oldest to the newest).
        c_history: Vector of c (intercept) values as training progresses
            (from the oldest to the newest).
    """
    common_axis = dict(
        mirror=True,
        ticks='outside',
        showline=True,
        linewidth=2,
        linecolor='black'
    )
    
    fig = make_subplots(
        rows=1,
        cols=2,
        # horizontal_spacing=0.01,
        subplot_titles=("Test data fit", "Loss w/ iterations")
    )
    
    # plot the data, the hypothesis, and true line
    scatter_plots = get_hypothesis_scatter_plots(
        data_x=data_x,
        data_y=data_y,
        predictor=predictor,
    )
    for plot in scatter_plots:
        fig.add_trace(plot, row=1, col=1)
    fig.update_xaxes(title_text="x (input)", row=1, col=1, **common_axis)
    fig.update_yaxes(title_text="y (output)", row=1, col=1, **common_axis)
    
    # plot history of the cost against training iterations
    fig.add_trace(
        go.Scatter(
            x=list(range(1, loss_history.size + 1)),
            y=loss_history,
            mode="markers+lines",
            name="loss",
        ),
        row=1, col=2,
    )
    fig.update_xaxes(title_text="Step", row=1, col=2, **common_axis)
    fig.update_yaxes(title_text="Loss", row=1, col=2, **common_axis)
    
    # move the legend
    fig.update_layout(
        legend=dict(
            orientation="h",
            yanchor="bottom",
            y=1.05,
            xanchor="right",
            x=1
        )
    )
    
    fig.show()

In [None]:
####################################################################
# Generate some (fake) data to do supervised regression
####################################################################

np.random.seed(1)

N_train = 200
N_test = 30

min_x = -20
max_x = 10
noise_std = 0

true_params = [10, 20, 1]

# generate the data from some underlying non-linear function
train_x, train_y = generate_fake_single_var_data(
    n_points=N_train,
    min_x=min_x,
    max_x=max_x,
    true_params=true_params,
    noise_std=noise_std,
)

# generate some test data
test_x, test_y = generate_fake_single_var_data(
    n_points=N_test,
    min_x=min_x,
    max_x=max_x,
    true_params=true_params,
    noise_std=noise_std,
)

####################################################################
# Plot the generated data (and the underlying randomly generated line)
####################################################################

fig = go.Figure(
    layout=dict(
        xaxis_title="x (input)",
        yaxis_title="y (output)",
        title=f"Data and the underlying function",
        title_x=0.5,
    )
)

scatter_plots = get_hypothesis_scatter_plots(data_x=train_x, data_y=train_y)
for plot in scatter_plots:
    fig.add_trace(plot)

fig.show()    

In [None]:
def forward_pass(data_x: np.ndarray, curr_params: np.ndarray) -> np.ndarray:
    w1, w2, w3, w4 = curr_params
    # u = w1 + w2.x
    u = w1 + w2 * data_x
    # v = sigmoid(u)
    v = 1 / (1 + np.exp(-u))
    # y = w3 + w4.v
    y = w3 + w4 * v
    return y


def l2_loss(y_pred: np.ndarray, y_true: np.ndarray) -> float:
    loss_per_sample = (y_pred - y_true) ** 2
    return np.mean(loss_per_sample)


def params_grad(
    data_x: np.ndarray,
    y_true: np.ndarray,
    curr_params: np.ndarray
):
    # forward pass
    w1, w2, w3, w4 = curr_params
    # u = w1 + w2.x
    u = w1 + w2 * data_x
    # v = sigmoid(u)
    v = 1 / (1 + np.exp(-u))
    # y = w3 + w4.v
    y = w3 + w4 * v
    
    grad_per_loss = 2 * (y - y_true)
    
    grad_w1 = grad_per_loss * w4 * v * (1 - v)
    grad_w2 = grad_per_loss * w4 * v * (1 - v) * data_x
    grad_w3 = grad_per_loss
    grad_w4 = grad_per_loss * v
    
    grad_w1 = np.mean(grad_w1)
    grad_w2 = np.mean(grad_w2)
    grad_w3 = np.mean(grad_w3)
    grad_w4 = np.mean(grad_w4)
    
    grad_w = np.array([grad_w1, grad_w2, grad_w3, grad_w4])
    return grad_w

    
def gradient_descent_step(
    data_x: np.ndarray,
    y_true: np.ndarray,
    curr_params: np.ndarray,
    learning_rate: float
) -> Tuple[float, float]:
    """Runs a single step of gradient descent
    
    Args:
        data_x: Vector of x values
        data_y: Vector of corresponding y values
        curr_m: The m (slope) value to do gradient descent at
        curr_c: The c (intercept) value to do gradient descent at
        learning_rate: Step size for gradient descent
    
    Returns:
        new_m: The new scalar value for m (slope) after a single step of
            gradient descent.
        new_c: The new scalar value for c (intercept) after a single step of
            gradient descent.
    """
    # compute the gradients
    grad_w = params_grad(
        data_x=data_x,
        y_true=y_true,
        curr_params=curr_params,
    )
    
    new_params = curr_params - learning_rate * grad_w
    
    return new_params

In [None]:
####################################################################
# Run gradient descent for n_steps
####################################################################

n_steps = 100

# some initial weights for the network
curr_params = np.array([1, 1, 1, 1])

# the step size used during gradient descent
learning_rate = 1e-1

loss_history = []

for idx in range(n_steps):
    # forward pass through the network
    y_pred = forward_pass(data_x=train_x, curr_params=curr_params)
    
    # computer the loss
    loss = l2_loss(y_pred=y_pred, y_true=train_y)
    loss_history.append(loss)
    
    # gradient descent by back-propagation
    curr_params = gradient_descent_step(
        data_x=train_x,
        y_true=train_y,
        curr_params=curr_params,
        learning_rate=learning_rate,
    )
    
    print(f"Step {idx: <3},  Cost: {loss:.5f},  {curr_params}")

# # plot the whole gradient descent process
plot_gradient_descent_info(
    data_x=test_x,
    data_y=test_y,
    predictor=partial(forward_pass, curr_params=curr_params),
    loss_history=np.array(loss_history),
)