# Lab 2: Linear Regression with MLE and MAP

In this lab, you will experiment with a linear regression model. The data and optimization method can be changed. Try them out and see what happens. Which optimization methods are better? Why? Could they be adapted for different scenarios?

As mentioned in the lecture, least square linear regression can often be done analytically. The main pedagogical point of this lab is to illustrate finding model parameters using interative optimization. Regression models are primarily used as illustration.

There are several regression models where the parameters cannot be found analytically. Most of these models fall outside the scope of this course.

As usual, we start with imports.

In [None]:
import numpy as np
import time

import matplotlib.pyplot as plt
import ipywidgets
# from IPython import display

%matplotlib inline

## Data

Do you remember Anscombe's Quartet? There are four sets of points here. All have the same means, variances, and best parameters (for a least square line).

In [None]:
def anscombe(n=1):
  """Load one set of Anscombe's Quartet"""
  assert 1 <= n and n <= 4
  X = np.vstack([10, 8, 13, 9, 11, 14, 6, 4, 12, 7, 5])
  if n == 1:
    y = [8.04, 6.95, 7.58, 8.81, 8.33, 9.96, 7.24, 4.26, 10.84, 4.82, 5.68]
  elif n == 2:
    y = [9.14, 8.14, 8.74, 8.77, 9.26, 8.10, 6.13, 3.10, 9.13, 7.26, 4.74]
  elif n == 3:
    y = [7.46, 6.77, 12.74, 7.11, 7.81, 8.84, 6.08, 5.39, 8.15, 6.42, 5.73]
  elif n == 4:
    X = [8, 8, 8, 8, 8, 8, 8, 19, 8, 8, 8]
    y = [6.58, 5.76, 7.71, 8.84, 8.47, 7.04, 5.25, 12.50, 5.56, 7.91, 6.89]
  X = np.vstack(X)
  y = np.asarray(y)
  return X, y

X, y = anscombe()

## The first model

For simplicity, we can define a model with intercept and slope as explicit arguments.

In [None]:
def simple_model(x, intercept, slope):
  """Simple linear model"""
  ret = intercept + x*slope
  return ret.ravel() # Sklearn output data is flat, like this

Each set in Anscombe's Quartet looks different but has the same parameters. You can change these to get a different linear model.

In [None]:
intercept = 3
slope = .5

Now for plotting the loaded data and the model given predefined parameter values.

In [None]:
plt.figure(figsize=(5, 5), dpi=100)

x_plot = np.asarray([X.min(), X.max()])
plt.plot(x_plot, simple_model(x_plot, intercept, slope), color='C1', linewidth=3, zorder=0)

for i in range(X.shape[0]):
  plt.plot([X[i], X[i]], [y[i], simple_model(X[i], intercept, slope)], 'k--', alpha=.3, zorder=-1)

plt.scatter(X, y, color='C0', edgecolors='k', zorder=1)

plt.axis('equal')
plt.show()

By changing the parameters in a widget, we can see different models and their loss right away without rerunning cells.

In [None]:
def interactive_residual_plot(intercept, slope):
  # Create the plot
  plt.figure(figsize=(5, 5), dpi=100)
  # Plot regression line
  x_plot = np.asarray([X.min(), X.max()])
  plt.plot(x_plot, intercept + slope*x_plot, color='C1', linewidth=3, zorder=0, label="Trendline")
  # Plot training data
  plt.scatter(X, y, color='C0', edgecolors='k', zorder=1, label="Data")
  # Error 'path'
  for i in range(X.shape[0]):
    plt.plot([X[i], X[i]], [y[i], intercept + slope*X[i]], 'k--', alpha=.3, zorder=-1)
  # Fix look
  plt.axis('equal')
  plt.legend(loc='lower right')
  # Find and print the residual
  residual = np.sum((y - (intercept + slope*X.ravel()))**2)
  plt.title("Residual (squared error): %.1f" % residual)
  # Show the plot
  plt.show()

interact_plot = ipywidgets.interact(interactive_residual_plot,
                                    intercept=ipywidgets.FloatSlider(value=5, min=0, max=10, step=0.1, description="Intercept"),
                                    slope=ipywidgets.FloatSlider(value=0, min=-1, max=1, step=0.05, description="Slope"));
output = interact_plot.widget.children[-1] # This should prevent flickering
output.layout.height = '500px'

## Parameter optimization
 
What are the best parameter values? Any best/good set of parameters are always evaluated using some quality metric, often called "loss." For simple linear regression, the loss function is squared loss, as in the code above. This is defined as $\sum_i (y_i - \hat y_i)^2$. We could image other loss functions for simple regression, like using the $l_1$ distance instead of $l_2$ (squared euclidean distance). The loss function then becomes $\sum_i |y_i - \hat y_i|$.

Maybe the most simple way of finding the best parameters would be to randomly choose the values for the parameters and only keep the set with the lowest loss (i.e., the lowest residual). Let's see what this might look like.

In [None]:
def model(x, theta):
  """Basic linear model"""
  ret = theta[0] + x*theta[1]
  return ret.ravel()
  
def lnorm_loss(y, y_hat, l=2):
  """Finds the residual given the target data (y) and the predictions (y_hat)."""
  if l==2:
    return np.sum((y - y_hat)**2) # l2 loss
  elif l==1:
    return np.sum(np.abs(y - y_hat)) # l1 loss

In [None]:
for i in range(10):
  theta = np.random.uniform(-10, 10, size=2)  # Random guess (within constraints) for parameters.
  print("theta: %.1f, %.1f \tl2 loss: %.1f   \tl1 loss: %.1f" % (theta[0], theta[1], lnorm_loss(y, model(X, theta), l=2), lnorm_loss(y, model(X, theta), l=1)))

This will take some time, even when the distribution is constrained. There must be a better way!
 
We could imagine other ways. Below, I have defined a couple of ways to do optimization. Go through them all (not necessarily in order) and try to understand what is happening. Each algorithm uses the model, data, and residual defined above. When you want to change something from above, simply load the new data or rewrite (and reload) the loss function to use some other metric. Each algorithm will use your `model` and `loss` functions and return a history of parameters that can be used with the plotting functions further down in this notebook.

In [None]:
def coordinate_descent(function_to_minimize, initial_theta, n_iterations=50, step=.25):
  """Coordinate descent (look at wikipedia for definition)"""
  theta = initial_theta.copy()
  history = [initial_theta.copy()]
  function_values = [function_to_minimize(initial_theta)]
  for iteration in range(n_iterations):
    theta_changed_during_iter = False
    for dim in range(len(initial_theta)):
      for direction in [-1, 1]:
        new_theta = theta.copy()
        new_theta[dim] += direction*step
        while function_to_minimize(new_theta) < function_to_minimize(theta):
          theta = new_theta
          theta_changed_during_iter = True
          history.append(theta)
          function_values.append(function_to_minimize(new_theta))
          new_theta = theta.copy()
          new_theta[dim] += direction*step
    if not theta_changed_during_iter:
      step /= 2
      # break
  return theta, function_values, np.concatenate([np.vstack(h).T for h in history])

def finite_difference_gradient_descent(function_to_minimize, initial_theta, n_iterations=20, gamma=.01):
  """Gradient descent optimization, where the gradient is approximated using
  finite difference. Look at scipy.optimize.approx_fprime to learn more."""
  from scipy.optimize import approx_fprime
  theta = initial_theta.copy()
  history = [initial_theta.copy()]
  function_values = [function_to_minimize(initial_theta)]
  for iteration in range(n_iterations):
    gradient = approx_fprime(theta, function_to_minimize, epsilon=.001)
    theta = theta - gamma*gradient
    history.append(theta)
    function_values.append(function_to_minimize(theta))
  return theta, function_values, np.concatenate([np.vstack(h).T for h in history])

def random_changes(function_to_minimize, initial_theta, n_iterations=200, noise_sd=.1):
  """Adds random noise to the parameter vector in the hope of 
  improving the loss."""
  theta = initial_theta.copy()
  history = [initial_theta.copy()]
  function_values = [function_to_minimize(initial_theta)]
  for iteration in range(n_iterations):
    new_theta = theta + np.random.normal(scale=noise_sd, size=theta.shape)
    if function_to_minimize(new_theta) < function_to_minimize(theta):
      theta = new_theta
    history.append(theta.copy())
    function_values.append(function_to_minimize(theta))
  return theta, function_values, np.concatenate([np.vstack(h).T for h in history])

def scipy_minimize(function_to_minimize, initial_theta):
  """Optimization using external library. Look at the manual of 
  scipy.optimize.minimize for specifics."""
  from scipy.optimize import minimize
  history = [initial_theta.copy()]
  function_values = [function_to_minimize(initial_theta)]
  def store_theta_during_optimization(current_theta):
    history.append(current_theta)
    function_values.append(function_to_minimize(current_theta))
  res = minimize(lambda t: function_to_minimize(t), initial_theta, 
                 callback=store_theta_during_optimization)
  return theta, function_values, np.concatenate([np.vstack(h).T for h in history])

In [None]:
def plot_loss_landscape(history, loss_function):
  xx, yy = np.meshgrid(np.linspace(min(history[:, 0].min(), -5), max(history[:, 0].max(), 10), 100), # Intercept
                      np.linspace(min(history[:, 1].min(), -1), max(history[:, 1].max(), 2), 100)) # Slope
  Z = np.apply_along_axis(lambda theta: loss_function(theta), 1, 
                          np.concatenate([np.vstack(xx.ravel()), np.vstack(yy.ravel())], axis=1))
  Z = Z.reshape(xx.shape)

  fig = plt.figure(figsize=(6, 6))
  ax = fig.subplots(1, 1)
  ax.contourf(xx, yy, np.log(Z))
  ax.set_xlabel("Intercept")
  ax.set_ylabel("Slope")
  ax.plot([h[0] for h in history], [h[1] for h in history], 'r.-', alpha=.7, zorder=1, label="Path to optimum")
  ax.scatter(history[-1, 0], history[-1, 1], s=100, color='r', edgecolors='k', zorder=2, label="Optimum")
  ax.legend()
  fig.tight_layout()
  fig.show()

def plot_loss(loss):
  fig = plt.figure(figsize=(8, 4))
  ax = fig.subplots(1, 1)
  ax.plot(loss, linewidth=2)
  # ax.set_yscale('log')
  ax.set_ylabel("Loss")
  ax.set_xlabel("Step")
  fig.show()

In [None]:
# Be carefull when defining functions like this. It convinient here but might
# create confusion due to it using global data.
loss_function = lambda theta: lnorm_loss(y, model(X, theta), l=2)

# theta, loss, history = coordinate_descent(function_to_minimize=loss_function,
#                                           initial_theta=np.random.uniform(-5, 5, size=2), 
#                                           n_iterations=5000, step=.1)

# theta, loss, history = finite_difference_gradient_descent(function_to_minimize=loss_function,
#                                           initial_theta=np.random.uniform(-5, 5, size=2), 
#                                           n_iterations=20, gamma=.0001)

theta, loss, history = random_changes(function_to_minimize=loss_function,
                                      initial_theta=np.random.uniform(-5, 5, size=2),
                                      n_iterations=200, noise_sd=.1)

# theta, loss, history = scipy_minimize(function_to_minimize=loss_function,
#                                       initial_theta=np.random.uniform(-5, 5, size=2))

plot_loss(loss)

In [None]:
plot_loss_landscape(history, loss_function=loss_function)

In [None]:
def plot_wrapper(iteration):
  intercept, slope = history[iteration]
  print("Intercept: %.1f, Slope: %.1f" % (intercept, slope))
  interactive_residual_plot(intercept, slope)
interact_plot = ipywidgets.interact(plot_wrapper,
                                    iteration = ipywidgets.Play(min=0, max=len(history)-1, step=max(1, len(history)//100), value=0));
output = interact_plot.widget.children[-1] # This should prevent flickering
output.layout.height = '500px'
# Note that the play button doesn't always show up in colab. The clickable area is still there though (leftmost gray area).

In [None]:
from mpl_toolkits.mplot3d.axes3d import Axes3D

xx, yy = np.meshgrid(np.linspace(-5, 10, 100), # Intercept
                    np.linspace(-1, 2, 100)) # Slope

fig = plt.figure(figsize=(16, 8))
for order in [1, 2]:
  ax = fig.add_subplot(1, 2, order, projection='3d')
  Z = np.apply_along_axis(lambda theta: lnorm_loss(y, model(X, theta), l=order), 1, 
                          np.concatenate([np.vstack(xx.ravel()), np.vstack(yy.ravel())], axis=1))
  Z = Z.reshape(xx.shape)
  ax.plot_surface(xx, yy, Z, cmap='viridis', rstride=5, cstride=5)
  ax.set_title("$l_%i$ loss surface" % order)
  ax.set_xlabel("Intercept")
  ax.set_ylabel("Slope")
  ax.set_zlabel("Loss")
  ax.view_init(30, 120)
plt.show()

## Probabilistic model

This is an implementation of the slide on Bayesian linear regression. To get a simple model and also being true to the lecture slides, the hack for the variance is also implemented. Try to understand the model, using both the code and the slides. Minimizing the negative of this loss function is the same as maximizing the probability (Maximum A Posteriori, MAP). See this as both an exercise in modelling and parsing code.

In [None]:
def bayesian_model(x, theta):
  """Bayesian linear model"""
  ret = theta[0] + x*theta[1]
  return ret.ravel()

from scipy.stats import norm

def model_log_likelihood(X, y, theta):
  """log likelihood of the model
  Note that the bayesian_model function name is hardcoded here. This is bad design."""
  return np.sum(norm(scale=theta[2]).logpdf(y-bayesian_model(X, theta)))

In [None]:
print("Intercept\tSlope\tNoise\t\tLog loss")
for i in range(10):
  theta = np.random.uniform(-10, 10, size=3)
  theta[2] = np.abs(theta[2]) # Theta_2 must not be negative, why?
  print("%.1f\t\t%.1f\t%.1f\t->\t%.1f" % (theta[0], theta[1], theta[2], model_log_likelihood(X, y, theta)))

In [None]:
from mpl_toolkits.mplot3d.axes3d import Axes3D

xx, yy = np.meshgrid(np.linspace(-5, 10, 100), # Intercept
                    np.linspace(-1, 2, 100)) # Slope

fig = plt.figure(figsize=(16, 8))
for i, noise_level in enumerate([2, 6]):
  ax = fig.add_subplot(1, 2, i+1, projection='3d')
  thetas_on_grid = np.concatenate([np.vstack(xx.ravel()), np.vstack(yy.ravel())], axis=1)
  thetas_on_grid = np.append(thetas_on_grid, noise_level*np.ones((thetas_on_grid.shape[0], 1)), axis=1)
  Z = np.apply_along_axis(lambda theta: model_log_likelihood(X, y, theta), 1, thetas_on_grid)
  Z = Z.reshape(xx.shape)
  ax.plot_surface(xx, yy, np.exp(Z), cmap='viridis')
  ax.set_title("Distribution over intercept and slope with fixed noise=%.1f" % noise_level)
  ax.set_xlabel("Intercept")
  ax.set_ylabel("Slope")
  ax.set_zlabel("Log Likelihood")
  ax.view_init(30, 120)
plt.show()

In [None]:
from scipy.optimize import minimize

history = [np.asarray([3, .5, .2])]

def store_theta_during_optimization(current_theta):
  history.append(current_theta)
res = minimize(lambda theta: -model_log_likelihood(X, y, theta), history[0], 
               bounds=[(None, None), (None, None), (1e-3, None)],
               callback=store_theta_during_optimization)
theta = res.x

print("MAP estimate of theta: %s" % theta)

In [None]:
intercept, slope = theta[:2]
plt.figure(figsize=(5, 5), dpi=100)
# Plot regression line
x_plot = np.asarray([X.min(), X.max()])
plt.plot(x_plot, intercept + slope*x_plot, color='C1', linewidth=3, zorder=0, label="Trendline")
plt.plot(x_plot, theta[2] + intercept + slope*x_plot, '--', color='C1', linewidth=1, zorder=0, label="1st sd")
plt.plot(x_plot, -theta[2] + intercept + slope*x_plot, '--', color='C1', linewidth=1, zorder=0)
plt.plot(x_plot, 2*theta[2] + intercept + slope*x_plot, '-.', color='C1', linewidth=1, zorder=0, label="2nd sd")
plt.plot(x_plot, -2*theta[2] + intercept + slope*x_plot, '-.', color='C1', linewidth=1, zorder=0)
# Plot training data
plt.scatter(X, y, color='C0', edgecolors='k', zorder=1, label="Data")
# Error 'path'
for i in range(X.shape[0]):
  plt.plot([X[i], X[i]], [y[i], intercept + slope*X[i]], 'k--', alpha=.3, zorder=-1)
# Fix look
plt.axis('equal')
plt.legend(loc='lower right')
# Find and print the residual
residual = np.sum((y - (intercept + slope*X.ravel()))**2)
plt.title("Residual (squared error): %.1f" % residual)
# Show the plot
plt.show()