# Chapter 11 - Gradient Descent

In [None]:
import sys
sys.path.append("../")
from utils import *
np.random.seed(0)

The most plain implementation of gradient descent, for minimizing a differentiable function $f$

In [None]:
def VanillaGradientDescent(f, f_grad, init=np.random.uniform(-1, 1, 2), eta=lambda t: .1, tol=1e-5):
    steps, delta = [init], tol

    t = 1
    while delta >= tol:
        g, eta_t = f_grad(steps[-1]), eta(t)
        step = steps[-1] - eta_t * g
        
        steps.append(step)
        delta = np.sum((steps[-1] - steps[-2])**2)**.5
        t += 1
        
    return np.array(steps)

The following functions are used for plotting (in 2D and 3D) the loss surface of a given function to optimize

In [None]:
def as_array(x):
    return np.array([x]) if np.isscalar(x) else x

def function_contour(fun, vals):
    xx, yy = np.meshgrid(vals, vals)
    z = fun(np.c_[xx.ravel(), yy.ravel()]).reshape(len(vals), len(vals))
    return go.Contour(x = vals, y=vals, z=z, opacity=.4, colorscale="Blues_r", showscale=False)

def function_surface(fun, vals):
    xx, yy = np.meshgrid(vals, vals)
    z = fun(np.c_[xx.ravel(), yy.ravel()]).reshape(len(vals), len(vals))
    return go.Surface(x = vals, y=vals, z=z, opacity=.4, colorscale="Blues_r", showscale=False)

### Optimize RSS Using GD

In [None]:
def RSS(X: np.ndarray, y: np.ndarray):
    def _evaluate(w: np.ndarray):
        Y = np.broadcast_to(y[..., np.newaxis], (y.shape[0], w.shape[0]))
        return np.sum( (X @ w.T - Y)**2, axis=0)
    
    def _gradient(w: np.ndarray):
        return 2 * X.T @ (X @ w.T - y)
    
    return _evaluate, _gradient


n = 50
w = np.random.random(size = (2, ))
X = np.c_[np.random.uniform(low=-3, high=3, size=(n, 1)), np.ones((n, 1))]
y = X @ w + np.random.normal(0, 1, size=(n,))

Using the RSS module above (the evaluation and gradient computation) functions above we explore the gradient descent algorithm. First, we can track the stepping of the algorithm in the parameter space (i.e. obaining different feasible solutions $\mathbf{w}$ at each iteration) and observe the linear model it reflects 

In [None]:
f, f_grad = RSS(X, y)

# Run the GD algorithm
steps = VanillaGradientDescent(f, f_grad, 
                               init=np.array([4.5,-4]), 
                               eta=lambda t: .005,
                               tol=5*1e-3)

# Obtain objective surface
vals = np.linspace(-5, 5, 50)
contour = function_contour(f, vals)
    
frames, markers = [], []
for i in range(1, len(steps)+1):
    z = as_array(f(steps[:i]))
    frames.append(go.Frame(data=[
        # 2D visualization of progress
        go.Scatter(x=steps[:i,0], y=steps[:i,1], marker=dict(size=3, color="black"), showlegend=False),
        go.Scatter(x=[steps[i-1,0]], y=[steps[i-1,1]], marker=dict(size=5, color="red"), showlegend=False), 
        contour,

        # Visualization of regression line and data
        go.Scatter(x=X[:, 0], y=y, marker=dict(size=5, color="black"), mode = 'markers', showlegend=False, xaxis="x2", yaxis="y2"),
        go.Scatter(x=[X[:, 0].min(), X[:, 0].max()], 
                   y=[X[:, 0].min()*steps[i-1,0] + steps[i-1,1], X[:, 0].max()*steps[i-1,0] + steps[i-1,1]],
                   marker=dict(size=3, color="Blue"), mode='lines', showlegend=False, xaxis="x2", yaxis="y2")],
        traces=[0, 1, 2, 3, 4, 5],
        layout=go.Layout(title=rf"$\text{{Iteration }} {i}/{steps.shape[0]}$" )))

# Create animated figure
fig = make_subplots(rows=1, cols=2, column_widths = [400, 700], horizontal_spacing=.075,
                    subplot_titles=(r"$\text{RSS Loss Surface}$", r"$\text{Fitted Model}$"))\
    .update_layout(width=1100, height = 400, title = frames[0].layout.title,
                   updatemenus = [dict(type="buttons", buttons=[AnimationButtons.play(1200,0), 
                                                                AnimationButtons.pause()])])

fig = fig.add_traces(frames[0]["data"], rows=1, cols=[1, 1, 1, 2, 2])\
         .update(frames = frames)

fig = fig.update_xaxes(range=[vals[0], vals[-1]], title=r"$\text{Coefficient }w_1$", col=1)\
         .update_yaxes(range=[vals[0], vals[-1]], title=r"$\text{Coefficient }w_2$", col=1)\
         .update_xaxes(title=r"$\text{Variable } x$", col=2)\
         .update_yaxes(range=[min(y)-.5, max(y)+.5], title=r"$\text{Response }y$", col=2)
    
animation_to_gif(fig, "../figures/rss_gd_opt.gif", 700, width=1100, height=400)
fig.show()

Next, we examin the RSS optimization process for different constant values of the step size

In [None]:
f, f_grad = RSS(X, y)

vals = np.linspace(-5, 5, 50)
contour = function_contour(f, vals)

eta = .0065
steps = VanillaGradientDescent(f, f_grad, eta=lambda t: eta,tol = 1e-5, init=np.array([4.5,-4]))

fig = go.Figure(data = 
          [go.Scatter(x=steps[:,0], y=steps[:,1], marker=dict(size=3, color="black"), mode="markers+lines", showlegend=False),
           contour],
          layout = go.Layout(
              width=400, height=400,
              xaxis = dict(title = r"$\text{Coefficient }w_1$", range=[-5,5]),
              yaxis = dict(title = r"$\text{Coefficient }w_2$", range=[-5,5]),
              title = rf"$\text{{Step Size: }}\eta={eta} \text{{ (}}n={len(steps)}\text{{ Iterations)}}$"
          ))

fig.write_image(f"../figures/rss_gd_eta_{eta}.png")
fig.show()

## Visualize 2/3D Traverse In Parameter Space For GD Iterations

In [None]:
def Animate_GradientDescent(f, f_grad, init, eta, delta, axis_range, frame_time=500):    
    steps = VanillaGradientDescent(f, f_grad, init, eta, delta)
    surface, contour = function_surface(f, axis_range), function_contour(f, axis_range)
    
    frames, markers = [], []
    for i in range(1, len(steps) + 1):
        z = as_array(f(steps[:i]))       
        frames.append(go.Frame(data=[
            # 3D visualization of progress
            go.Scatter3d(x=steps[:i,0], y=steps[:i,1], z=z[:i], marker=dict(size=3, color="black"), showlegend=False),
            go.Scatter3d(x=[steps[i-1,0]], y=[steps[i-1,1]], z=[z[i-1]],marker=dict(size=5, color="red"), showlegend=False), 
            surface,
            
            # 2D visualization of progress
            go.Scatter(x=steps[:i,0], y=steps[:i,1], marker=dict(size=3, color="black"), mode="markers+lines", showlegend=False),
            go.Scatter(x=[steps[i-1,0]], y=[steps[i-1,1]], marker=dict(size=5, color="red"), showlegend=False), 
            contour],
            traces=[0, 1, 2, 3, 4, 5],
            layout=go.Layout(title=rf"$\text{{Iteration }} {i}/{steps.shape[0]}$" )))

    return make_subplots(rows=1, cols=2, specs=[[{'type':'scene'}, {}]],
                         subplot_titles=('3D Visualization Of Function', '2D Visualization Of Function'))\
        .add_traces(data=frames[0]["data"], rows=[1, 1, 1, 1, 1, 1], cols=[1, 1, 1, 2, 2, 2])\
        .update(frames = frames)\
        .update_xaxes(range=[axis_range[0], axis_range[-1]])\
        .update_yaxes(range=[axis_range[0], axis_range[-1]])\
        .update_layout(width=900, height = 330, title = frames[0].layout.title,
                       updatemenus = [dict(type="buttons", buttons=[AnimationButtons.play(frame_time,0), 
                                                                    AnimationButtons.pause()])])

## Gradient Descent Over Gaussian Function

In [None]:
from numpy.linalg import solve, det

def negative_gaussian(mu=np.zeros(2), cov=np.eye(2)):
    from scipy.stats import multivariate_normal
    
    def _evaluate(x: np.ndarray):
        return  - multivariate_normal(mu, cov).pdf(x)

    def _gradient(x: np.ndarray):
        z = solve(cov,x-mu)
        return np.exp(-z @ (x-mu) /2) * z / (2*np.sqrt((2*np.pi)**mu.shape[0] * det(cov)))
    
    return _evaluate, _gradient


Animate_GradientDescent(*negative_gaussian(cov=[5,10]*np.eye(2)),
                        init=np.array([-4.8,-4.8]), 
                        eta= lambda t: 300, 
                        delta=1e-2, 
                        axis_range=np.linspace(-5, 5, 50))

In [None]:
def non_convex_function():
    def _evaluate(x: np.ndarray):
        x = np.stack(x, axis=0)
        z = np.sin(x[:, 0] * x[:, 1]) / np.sqrt(x[:, 0]**2 + x[:, 1]**2)

        return np.array([[z]]) if np.isscalar(z) else z

    
    def _gradient(x: np.ndarray):
        X, Y = x[0], x[1]
        a = np.array([(Y*np.cos(X*Y)*(X**2 + Y**2) - X*np.sin(X*Y)) / (X**2 + Y**2)**(1.5),
                     (X*np.cos(X*Y)*(X**2 + Y**2) - Y*np.sin(X*Y)) / (X**2 + Y**2)**(1.5)])
        return a
    
    return _evaluate, _gradient


Animate_GradientDescent(*non_convex_function(),
                        init=np.random.uniform(-5,5,2),
                        eta= lambda t: 2*.1, 
                        delta=1e-3, 
                        axis_range=np.linspace(-5, 5, 50))

## Stochastic Gradient Descent

Below is a naive implementation of the stochastic gradient descent, recieving a "module" to minimize and a batch size

In [None]:
def VanillaStochasticGradientDescent(module, init=np.random.uniform(-1, 1, 2), eta=lambda t: .1, tol=1e-5, batch_size=5):
    steps, delta = [init], tol

    t = 1
    while delta >= tol:
        # Sample data for current iteration
        ids = module.sample_batch(batch_size)
       
        # Calculate iteration elements
        g, eta_t = module.gradient(steps[-1], samples = ids), eta(t)
        step = steps[-1] - eta_t * g

        steps.append(step)
        delta = np.sum((steps[-1] - steps[-2])**2)**.5
        t += 1
        
    return np.array(steps)

The RSS module consists of `evaluate`, `gradient` and `sample_batch` functions. To enbable the SGD descent behave like the GD, in the case a batch size is not passed the `sample_batch` returns $0,1,\ldots,n\_samples$.

In [None]:
class RSS:
    def __init__(self, X: np.ndarray, y: np.ndarray):
        self.X, self.y = X, y
        
    def evaluate(self, w: np.ndarray, samples: np.ndarray = None):
        if samples is None:
            samples = np.arange(self.X.shape[0])
        
        X, y = self.X[samples, :], self.y[samples]
        
        Y = np.broadcast_to(y[..., np.newaxis], (y.shape[0], w.shape[0]))
        return np.sum( (X @ w.T - Y)**2, axis=0)
    
    def gradient(self, w: np.ndarray, samples: np.ndarray = None):
        if samples is None:
            samples = np.arange(self.X.shape[0])
        return 2 * self.X[samples,:].T @ (self.X[samples,:] @ w.T - self.y[samples])
    
    def sample_batch(self, n:int=None):
        if n is None:
            return np.arange(self.X.shape[0])
        return np.random.randint(self.X.shape[0], size=n)

In [None]:
# Generate data according to the linear regression with Gaussian noise assumptions
np.random.seed(0)
n = 100
w = np.array([5,-2])
X = np.c_[np.random.uniform(low=-3, high=3, size=(n, 1)), np.ones((n, 1))]
y = X @ w + np.random.normal(0, 5, size=(n,))

module = RSS(X, y)
vals = np.linspace(-30, 30, 100)
contour = function_contour(module.evaluate, vals)

eta, init = lambda t: .001, np.array([-20,-20])
gd_steps = VanillaStochasticGradientDescent(module, eta=eta, init=init, batch_size=None)
sgd_steps = VanillaStochasticGradientDescent(module, eta=eta, init=init, batch_size=3)

fig = make_subplots(rows=1, cols=2, 
                    subplot_titles = (r"$\text{Gradient Descent}$", 
                                      r"$\text{Stochastic Gradient Descent}$"))\
    .add_traces([go.Scatter(x=gd_steps[:,0], y=gd_steps[:,1], mode = "markers+lines", showlegend=False, marker_color="black"),
                 go.Scatter(x=sgd_steps[:,0], y=sgd_steps[:,1], mode = "markers+lines", showlegend=False, marker_color="black"),
                 contour,contour], rows=[1]*4, cols=[1,2,1,2])\
    .update_xaxes(range=[vals[0],vals[-1]])\
    .update_yaxes(range=[vals[0],vals[-1]])\
    .update_layout(width=800, height=400)

fig.write_image(f"../figures/rss_gd_sgd.png")
# fig.show()