# Gaussian process regression - hands on implementation

## Overview

This notebook contains a complete implementation of a Gaussian Process Regressor (GPR) with a squared exponential kernel using Numpy for matrix operations and Plotly for visualisation. 

The purpose of this notebook is to understand the main components of a GPR and to visualise the effects of certain parameters on the output. 
It is _not_ intended as a theoretical introduction to GPRs.
A good introduction can be found [here](http://www.gaussianprocess.org/gpml/chapters/RW.pdf). 

Stable and effective implementations of GPR's are available, for example, in the [gpytorch](https://gpytorch.ai) package.

## Packages

In [46]:
import numpy as np
import plotly.graph_objects as go
from ipywidgets import interact, widgets
import control

## Plotting helper functions

To have nice visualizations of the later GPR, we use Plotly and to have structured and lean code, we define a few commonly used 'helpers' here. 

In [10]:
def update_layout_of_graph(fig: go.Figure,title: str = 'Plot')->go.Figure:
    fig.update_layout(
        width=800,
        height=600,
        autosize=False,
        plot_bgcolor='rgba(0,0,0,0)',
        title=title,
        
    )
    fig.update_layout(plot_bgcolor='rgba(0,0,0,0)',
                      xaxis_title = 'input values',
                      yaxis_title = 'output values',
                      legend=dict(yanchor="top",
                                  y=0.9,
                                  xanchor="right",
                                  x=0.95),
                      title={
                          'x': 0.5,
                          'xanchor': 'center'
                      })
    fig.update_xaxes(showline=True, linewidth=1, linecolor='black')
    fig.update_yaxes(showline=True, linewidth=1, linecolor='black')
    return fig

In [11]:
def line_scatter(
    visible: bool = True,
    x_lines: np.array = np.array([]),
    y_lines: np.array = np.array([]),
    name_line: str = 'Predicted function',
    showlegend: bool = True,
) -> go.Scatter:
    # Adding the lines
    return go.Scatter(
        visible=visible,
        line=dict(color="blue", width=2),
        x=x_lines,
        y=y_lines,
        name=name_line,
        showlegend= showlegend
    )

In [12]:
def dot_scatter(
    visible: bool = True,
    x_dots: np.array = np.array([]),
    y_dots: np.array = np.array([]),
    name_dots: str = 'Observed points',
    showlegend: bool = True
) -> go.Scatter:
    # Adding the dots
    return go.Scatter(
        x=x_dots,
        visible=visible,
        y=y_dots,
        mode="markers",
        name=name_dots,
        marker=dict(color='red', size=8),
        showlegend=showlegend
    )

In [13]:
def uncertainty_area_scatter(
        visible: bool = True,
        x_lines: np.array = np.array([]),
        y_upper: np.array = np.array([]),
        y_lower: np.array = np.array([]),
        name: str = "mean plus/minus standard deviation",
) -> go.Scatter:

    return go.Scatter(
        visible=visible,
        x=np.concatenate((x_lines, x_lines[::-1])),  # x, then x reversed
        # upper, then lower reversed
        y=np.concatenate((y_upper, y_lower[::-1])),
        fill='toself',
        fillcolor='rgba(189,195,199,0.5)',
        line=dict(color='rgba(200,200,200,0)'),
        hoverinfo="skip",
        showlegend=True,
        name= name,
    )

In [14]:
def add_slider_GPR(figure: go.Figure, parameters):
    figure.data[0].visible = True
    figure.data[1].visible = True

    # Create and add slider
    steps = []
    for i in range(int((len(figure.data) - 1) / 2)):
        step = dict(
            method="update",
            label=f'{parameters[i]: .2f}',
            args=[{
                "visible": [False] * (len(figure.data) - 1) + [True]
            }],
        )
        step["args"][0]["visible"][2 *
                                   i] = True  # Toggle i'th trace to "visible"
        step["args"][0]["visible"][2 * i + 1] = True
        steps.append(step)

    sliders = [dict(
        active=0,
        pad={"t": 50},
        steps=steps,
    )]
    figure.update_layout(sliders=sliders, )
    return figure

In [15]:
def add_slider_to_function(figure:go.Figure, parameters):
    figure.data[0].visible = True

    # Create and add slider
    steps = []
    for i in range(len(figure.data)):
        step = dict(
            method="update",
            label=f'{parameters[i]: .2f}',
            args=[{
                "visible": [False] *len(figure.data) 
            }],
        )
        step["args"][0]["visible"][i] = True  # Toggle i'th trace to "visible"
        steps.append(step)

    sliders = [dict(
        active=0,
        pad={"t": 50},
        steps=steps,
    )]
    figure.update_layout(sliders=sliders, )
    return figure

## Implementation of GPR with squared exponential kernel

In order to define a gaussian process regressor (GPR) we need a covariance function (also called kernel). The choice of this function will determine the 'shape' of the later GPR. 

In this notebook we choose the popular _squared exponential_ kernel:
$$ k(x_1,x_2):= \sigma^2*\exp(-\|x_1-x_2\|^2_2)/(2*l^2))$$
with $$l>0$$ the lengthscale and $$\sigma^2>0$$ the signal variance. 
You are encouraged to implement a different kernel and see the difference in the resulting GPR!



In [31]:
class SquaredExponentialKernel:
    def __init__(self, sigma_f: float, diag_values: np.array):
        self.sigma_f = sigma_f
        self.diag_values = diag_values
        self._S = np.diag(diag_values)

    def __call__(self, argument_1: np.ndarray, argument_2: np.ndarray):
        diff = argument_1-argument_2
        S_inv = np.linalg.inv(self._S)
        exponent = -0.5 * np.dot(diff.T, np.dot(S_inv, diff))
        return float(self.sigma_f ** 2 * np.exp(exponent))

Let us visualize this kernel.

In [35]:
x_lines = np.arange(-10, 10, 0.1)

diag_values = np.array([1])
kernel = SquaredExponentialKernel(sigma_f = 1.0, diag_values=diag_values)

fig0 = go.FigureWidget(data=[
    line_scatter(
        x_lines=x_lines,
        y_lines=np.array([kernel(np.array([x]), np.array([0])) for x in x_lines]),
    )
])

fig0 = update_layout_of_graph(fig0, title='Squared exponential kernel')


def update(length=0.1, argument_2=0.0):
    S = np.array([[length ** 2]])  # Recompute S for each slider value
    kernel = SquaredExponentialKernel(sigma_f=1.0, S=S)
    with fig0.batch_update():
        fig0.data[0].y = np.array([
            kernel(np.array([x]), np.array([argument_2])) for x in x_lines
        ])

fig0

FigureWidget({
    'data': [{'line': {'color': 'blue', 'width': 2},
              'name': 'Predicted function',
              'showlegend': True,
              'type': 'scatter',
              'uid': 'e3cfd2dd-7f9d-4705-a0a1-fbe77f4b288d',
              'visible': True,
              'x': {'bdata': ('AAAAAAAAJMDNzMzMzMwjwJqZmZmZmS' ... 'ZmZiNAcpmZmZmZI0CkzMzMzMwjQA=='),
                    'dtype': 'f8'},
              'y': {'bdata': ('Pwh+VH0lbTsp5QpwUbWDO4v/ANc0Y5' ... 'FLfbE7vxMB1zRjmju29ApwUbWDOw=='),
                    'dtype': 'f8'}}],
    'layout': {'autosize': False,
               'height': 600,
               'legend': {'x': 0.95, 'xanchor': 'right', 'y': 0.9, 'yanchor': 'top'},
               'plot_bgcolor': 'rgba(0,0,0,0)',
               'template': '...',
               'title': {'text': 'Squared exponential kernel', 'x': 0.5, 'xanchor': 'center'},
               'width': 800,
               'xaxis': {'linecolor': 'black', 'linewidth': 1, 'showline': True, 'title': {'tex

In [18]:
print(np.finfo(float).eps)
# 2.22044604925e-16

print(np.finfo(np.float32).eps)
# 1.19209e-07

2.220446049250313e-16
1.1920929e-07


In [19]:
# Helper function to calculate the respective covariance matrices
def cov_matrix(x1, x2, cov_function) -> np.array:
    return np.array([[cov_function(a, b) for a in x1] for b in x2])

**The GP**

In [45]:
class GPR:
    def __init__(self,
                 data_x: np.array,
                 data_y: np.array,
                 covariance_function=SquaredExponentialKernel(sigma_f=1, diag_values=np.array([1])),
                 white_noise_sigma: float = 0):
        self.noise = white_noise_sigma
        self.data_x = data_x
        self.data_y = data_y
        self.covariance_function = covariance_function

        # Store the inverse of covariance matrix of input (+ machine epsilon on diagonal) since it is needed for every prediction
        self._inverse_of_covariance_matrix_of_input = np.linalg.inv(
            cov_matrix(data_x, data_x, covariance_function) +
            (3e-7 + self.noise) * np.identity(len(self.data_x)))

        self._memory = None

    # function to predict output at new input values. Store the mean and covariance matrix in memory.

    def predict(self, at_values: np.array) -> np.array:
        k_lower_left = cov_matrix(self.data_x, at_values,
                                  self.covariance_function)
        k_lower_right = cov_matrix(at_values, at_values,
                                   self.covariance_function)

        # Mean.
        mean_at_values = np.dot(
            k_lower_left,
            np.dot(self.data_y,
                   self._inverse_of_covariance_matrix_of_input.T).T).flatten()

        # Covariance.
        cov_at_values = k_lower_right - \
            np.dot(k_lower_left, np.dot(
                self._inverse_of_covariance_matrix_of_input, k_lower_left.T))

        # Adding value larger than machine epsilon to ensure positive semi definite
        cov_at_values = cov_at_values + 3e-7 * np.ones(
            np.shape(cov_at_values)[0])

        var_at_values = np.diag(cov_at_values)

        self._memory = {
            'mean': mean_at_values,
            'covariance_matrix': cov_at_values,
            'variance': var_at_values
        }
        return mean_at_values

**The dynamics**

In [None]:
def pendulum_dynamics(x, u, dt):
    g = 9.81
    l = 1.0
    theta, theta_dot = x
    theta_ddot = (g / l) * np.sin(theta) + u
    theta_dot += theta_ddot * dt 
    theta += theta_dot * dt 
    return np.array([theta, theta_dot])


**The LQR calculation**

In [51]:
def calculate_lqr(A_n, B_n, Q, R):
    K, S, E  = control.lqr(A_n, B_n, Q, R)
    return K


**The weights**

In [None]:
def design_weights(theta):
    W_x = np.diag([1, 50*theta[0], 10, 50 * theta[1]])
    W_u = 10
    return W_x, W_u

**The underlying model**

In [None]:
def define_model():
    M = 1.0 
    m = 0.1  
    l = 0.5 
    g = 9.81
    dt = 0.02
    A = np.array([
        [1.0, dt, 0, 0],
        [0, 1.0, -g * m / M, 0],
        [0, 0, 1.0, dt],
        [0, 0, g * (M + m) / (l * M), 1.0]
    ])

    B = np.array([
        [0],
        [dt / M],
        [0],
        [-dt / (l * M)]
    ])
    return A, B

**The cost function**

In [None]:
def cost(K: int, x: np.ndarray, u: np.ndarray, theta, Q: np.ndarray = np.diag([1, 100, 10, 200]), R: np.ndarray = [10]):
    A, B = define_model()
    W_x, W_u = design_weights(theta)
    K = calculate_lqr(A, B, W_x, W_u )
    sum = 0
    for k in range(K):
        sum+= np.dot(np.dot(x[k].T, Q), x[k]) + np.dot(np.dot(u[k].T, R), u[k])
    return 1/K * sum