<a href="https://colab.research.google.com/github/gmshashank/Gaussian_Processes/blob/main/Gaussian_Processes_Python.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

[Coding gaussian process regressors FROM SCRATCH in python
](https://www.youtube.com/watch?v=JXdrq7--XV0)

 This notebook contains a complete implementation of a Gaussian Process Regressor (GPR) with a squared exponential kernel using Numpy for matrix operations and Plotly for visualisation. The purpose of this notebook is to understand the main components of a GPR and to visualise the effects of certain parameters on the output. 
It is not intended as a theoretical introduction to GPRs.
A good introduction can be found [here](http://www.gaussianprocess.org/gpml/chapters/RW.pdf). Stable and effective implementations of GPR's are available, for example, in the [gpytorch](https://gpytorch.ai/) package.

## Packages

In [2]:
!pip install --upgrade plotly

Collecting plotly
  Downloading plotly-5.7.0-py2.py3-none-any.whl (28.8 MB)
[K     |████████████████████████████████| 28.8 MB 2.4 MB/s 
Installing collected packages: plotly
  Attempting uninstall: plotly
    Found existing installation: plotly 5.5.0
    Uninstalling plotly-5.5.0:
      Successfully uninstalled plotly-5.5.0
Successfully installed plotly-5.7.0


In [1]:
from google.colab import output
output.enable_custom_widget_manager()

In [3]:
import numpy as np
import plotly.graph_objects as go
from ipywidgets import interact,widgets

## Plotting Helper functions

 To have nice visualizations of the later GPR, we use Plotly and to have structured and lean code, we define a few commonly used 'helpers' here.

In [4]:
def update_layout_of_graph(fig: go.Figure,title: str = 'Plot')->go.Figure:
    fig.update_layout(
        width=800,
        height=600,
        autosize=False,
        plot_bgcolor='rgba(0,0,0,0)',
        title=title,
        
    )
    fig.update_layout(plot_bgcolor='rgba(0,0,0,0)',
                      xaxis_title = 'input values',
                      yaxis_title = 'output values',
                      legend=dict(yanchor="top",
                                  y=0.9,
                                  xanchor="right",
                                  x=0.95),
                      title={
                          'x': 0.5,
                          'xanchor': 'center'
                      })
    fig.update_xaxes(showline=True, linewidth=1, linecolor='black')
    fig.update_yaxes(showline=True, linewidth=1, linecolor='black')
    return fig

In [5]:
# plotting functions (mean)
def line_scatter(
    visible:bool=True,
    x_lines:np.array=np.array([]),
    y_lines:np.array=np.array([]),
    name_line:str="Predicted function",
    showlegend:bool=True,
    )->go.Scatter:

    return go.Scatter(
        visible=visible,
        line=dict(color="blue",width=2),
        x=x_lines,
        y=y_lines,
        name=name_line,
        showlegend=showlegend
    )

In [6]:
# plotting data points
def dot_scatter(
    visible:bool=True,
    x_dots:np.array=np.array([]),
    y_dots:np.array=np.array([]),
    name_dots:str="Observed points",
    showlegend:bool=True
    )->go.Scatter:

    return go.Scatter(
        x=x_dots,
        visible=visible,
        y=y_dots,
        mode="markers",
        name=name_dots,
        marker=dict(color="red",size=8),
        showlegend=showlegend
    )

In [7]:
# visualize the uncertainity as an area in graph
def uncertainity_area_scatter(
    visible:bool=True,
    x_lines:np.array=np.array([]),
    y_upper:np.array=np.array([]),
    y_lower:np.array=np.array([]),
    name:str="Mean plus/minus standard deviation",
    )->go.Scatter:

    return go.Scatter(
        visible=visible,
        x=np.concatenate((x_lines,x_lines[::-1])),    # x and x reversed
        y=np.concatenate((y_upper,y_lower[::-1])),    # upper and then lower reversed
        fill="toself",
        fillcolor="rgba(189,195,199,0.5)",
        line=dict(color="rgba(200,200,200,0)"),
        hoverinfo="skip",
        showlegend=True,
        name=name,
    )

In [8]:
def add_slider_GPR(figure: go.Figure, parameters):
    figure.data[0].visible = True
    figure.data[1].visible = True

    # Create and add slider
    steps = []
    for i in range(int((len(figure.data) - 1) / 2)):
        step = dict(
            method="update",
            label=f'{parameters[i]: .2f}',
            args=[{
                "visible": [False] * (len(figure.data) - 1) + [True]
            }],
        )
        step["args"][0]["visible"][2 *
                                   i] = True  # Toggle i'th trace to "visible"
        step["args"][0]["visible"][2 * i + 1] = True
        steps.append(step)

    sliders = [dict(
        active=0,
        pad={"t": 50},
        steps=steps,
    )]
    figure.update_layout(sliders=sliders, )
    return figure

In [9]:
def add_slider_to_function(figure:go.Figure, parameters):
    figure.data[0].visible = True

    # Create and add slider
    steps = []
    for i in range(len(figure.data)):
        step = dict(
            method="update",
            label=f'{parameters[i]: .2f}',
            args=[{
                "visible": [False] *len(figure.data) 
            }],
        )
        step["args"][0]["visible"][i] = True  # Toggle i'th trace to "visible"
        steps.append(step)

    sliders = [dict(
        active=0,
        pad={"t": 50},
        steps=steps,
    )]
    figure.update_layout(sliders=sliders, )
    return figure

## Gaussian Process implementation using SquaredExponential kernel

 In order to define a gaussian process regressor (GPR) we need a covariance function (also called kernel). The choice of this function will determine the 'shape' of the later GPR. In this notebook we choose the popular squared exponential kernel:
$$ k(x_1,x_2):= \sigma^2\exp(-\|x_1-x_2\|^2_2)/(2l^2))$$
with $$l>0$$ the lengthscale and $$\sigma^2>0$$ the signal variance. 
You are encouraged to implement a different kernel and see the difference in the resulting GPR!

In [10]:
# Covariance function --> Squareed Exponential kernel

class SquaredExponentialKernel:
  def __init__(self,sigma_f:float=1,length:float=1):
    self.sigma_f=sigma_f
    self.length=length
  
  def __call__(self,argument_1:np.array,argument_2:np.array)->float:
    return float(self.sigma_f*np.exp(-1*(np.linalg.norm(argument_1-argument_2)**2)/(2*self.length**2)))

In [12]:
x_lines = np.arange(-10, 10, 0.1)
kernel = SquaredExponentialKernel(length=1)

fig0 = go.FigureWidget(data=[
    line_scatter(
        x_lines=x_lines,
        y_lines=np.array([kernel(x, 0) for x in x_lines]),
    )
])

fig0 = update_layout_of_graph(fig0, title='Squared exponential kernel')


@interact(length=(0.1, 3, 0.1), argument_2=(-10, 10, 0.1))
def update(length=1, argument_2=0):
    with fig0.batch_update():
        kernel = SquaredExponentialKernel(length=length)
        fig0.data[0].y = np.array([kernel(x, argument_2) for x in x_lines])


fig0

interactive(children=(FloatSlider(value=1.0, description='length', max=3.0, min=0.1), FloatSlider(value=0.0, d…

FigureWidget({
    'data': [{'line': {'color': 'blue', 'width': 2},
              'name': 'Predicted function'…

In [18]:
# Helper function to calculate covariance matrices
def cov_matrix(x1,x2,cov_function)->np.array:
  return np.array([[cov_function(a,b) for a in x1] for b in x2])

In [36]:
class GPR:
  def __init__(
      self,
      data_x:np.array,
      data_y:np.array,
      covariance_function=SquaredExponentialKernel(),
      white_noise_sigma:float=0
      ):
    self.white_noise_sigma=white_noise_sigma
    self.data_x=data_x
    self.data_y=data_y
    self.covariance_function=covariance_function
    self._memory=None
    self._inverse_of_covariance_matrix_of_input=np.linalg.inv(cov_matrix(data_x,data_x,covariance_function) + (self.white_noise_sigma + 3e-7)*np.identity(len(self.data_x)))

  def predict(self,at_values:np.array)->np.array:
    k_lower_left =cov_matrix(self.data_x,at_values,self.covariance_function)
    k_lower_right=cov_matrix(at_values,at_values,self.covariance_function)

    # Mean
    mean_at_values=np.dot(k_lower_left,np.dot(self.data_y,self._inverse_of_covariance_matrix_of_input.T).T).flatten()

    # Covariance
    cov_at_values=k_lower_right-np.dot(k_lower_left,np.dot(self._inverse_of_covariance_matrix_of_input,k_lower_left.T))

    # Adding values larger than epsilon to ensure we get a positive semi definite
    cov_at_values=cov_at_values+3e-7*np.ones(np.shape(cov_at_values)[0])

    var_at_values=np.diag(cov_at_values)

    self._memory={
        "mean":mean_at_values,
        "covariance_matrix":cov_at_values,
        "variance":var_at_values
    }
    return mean_at_values

In [37]:
# Initialize GPR on random training and visaulise it

x_values=np.array([0,0.3,1,3.1,4.7])
y_values=np.array([1,0,1.4,0,-0.9])

In [46]:
x = np.arange(-1,7,0.1)

In [55]:
# Plot the GPR

def plot_GPR(data_x,data_y,model,x,visible=True)->list:
  mean=model.predict(x)

  std=np.sqrt(model._memory["variance"])
  data=[]

  for i in range(1,4):
    data.append(
        uncertainity_area_scatter(
            x_lines=x,
            y_lower=mean-i*std,
            y_upper=mean+i*std,
            name=f"mean plus/minus {i}*standard deviation",
            visible=visible))
  data.append(line_scatter(x_lines=x,y_lines=mean,visible=visible))
  data.append(dot_scatter(x_dots=data_x,y_dots=data_y,visible=visible))
  return data

In [56]:
model=GPR(x_values,y_values)
data=plot_GPR(data_x=x_values,data_y=y_values,x=x,model=model)
fig=go.Figure(data=data)
fig=update_layout_of_graph(fig=fig,title="GPR with length 1, sigma= 0 and noise=0")
fig.show()

In [69]:
model = GPR(x_values, y_values)

mean = model.predict(x)
covariance_matrix = model._memory['covariance_matrix']

fig1 = go.FigureWidget(data=[dot_scatter(x_dots=x_values, y_dots=y_values)])
fig1 = update_layout_of_graph(
    fig1,
    title='Random drawings (i.e. random functions) of the Gaussian process')

button = widgets.Button(description='Add random drawing')


def update(_):
    with fig1.batch_update():
        fig1.add_trace(
            line_scatter(x_lines=x,
                         y_lines=np.random.multivariate_normal(
                             mean, covariance_matrix),
                         name_line='random function',
                         showlegend=False))
        fig1.add_trace(
            dot_scatter(x_dots=x_values, y_dots=y_values, showlegend=False))


fig1.show()

button.on_click(update)
widgets.VBox([fig1, button])


VBox(children=(FigureWidget({
    'data': [{'marker': {'color': 'red', 'size': 8},
              'mode': 'mark…