### Investigate the error introduced by the lookup table for the **angle error**

In [1]:
import os
import pickle
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl
from tqdm.notebook import tqdm

### Import the `lib` directory
import pathlib
import sys

# To add the files in the lib directory to the importable packages
repo_directory = pathlib.Path().resolve().parents[1]
lib_module_dir = str(repo_directory.joinpath("lib"))
if lib_module_dir not in sys.path:
    sys.path.insert(0, str(repo_directory.joinpath("lib")))

Make sure you pick a training directory from the 2D GP function and not 1D scale function

In [17]:
training_dir = ""

assert training_dir != "", (
    "You should pick a directory generated by a model training. These are not uploaded to repository so you need to"
    " generate them yourself by running `angle_error.ipynb`. To save time, you can reduce the number of epochs."
)

### 1. Load the **error model**, i.e., **lookup table**

In [19]:
from error_model import ErrorModel

In [20]:
error_models_path = os.path.join(training_dir, "error_models")

assert os.path.isdir(error_models_path) and os.listdir(error_models_path), "No error models found"

In [21]:
error_model_file = sorted(os.listdir(error_models_path))[-1]
error_model_filepath = os.path.join(error_models_path, error_model_file)
epoch_str = error_model_file.replace("error_model_", "").replace(".pickle", "")

try:
    epoch = int(epoch_str)
except ValueError:
    raise ValueError(f"A valid epoch number could not be obtained for the error model file at {error_model_filepath}")

print(f"Loading the error model from epoch {epoch}...")

with open(error_model_filepath, "rb") as f:
    error_model = pickle.load(f)

print(f"Error model successfully load:")
print(error_model)

Loading the error model from epoch 1000...
Error model successfully load:
<class 'error_model.ErrorModel'>
error                    : longitudinal error rate
param1                   : acceleration
param2                   : velocity
ndim                     : 2
x0 boundaries            : (-0.25, 0.25)
x1 boundaries            : (3.602107365926208e-05, 0.5)



### 2. Load the **GP**

This is a bit of a hacky solution. Make sure to use the same parameters and data used when training the model.

In [22]:
from dual_gp_model_SVGP import DualGaussianProcessWrapper

In [23]:
params_file = f"epoch {epoch}.pickle"
params_filepath = os.path.join(training_dir, "params", params_file)
assert os.path.isfile(params_filepath), f"Parameters file does not exist for epoch {epoch}. Choose another error model."

In [24]:
dependencies_folder = pathlib.Path().resolve().parent.joinpath("dependencies")
VelAcc = pd.read_csv(dependencies_folder.joinpath("VelAcc.csv"))
x_data = np.vstack((VelAcc["accelerations"].to_numpy(), VelAcc["velocities"].to_numpy())).T
y_data = VelAcc["long_dt_errors"].to_numpy()

In [25]:
### Use this model only to do predictions
GP_model = DualGaussianProcessWrapper.continue_training(
    x_data=x_data,
    y_data=y_data,
    params_filepath=params_filepath,
)

Params loaded from Training at 2024-08-31 Saturday at 00.48u/params/epoch 1000.pickle...




### 3. Find the errors

The biggest error is most likely found at the center of each interpolation interval

In [26]:
def arrays_to_grid_arrays(x0_grid: np.ndarray, x1_grid: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
    """Convert two arrays to two arrays that contain all the points on grid spanned by the two input arrays"""
    x0, x1 = np.meshgrid(x0_grid, x1_grid)
    return tuple(np.vstack((x0.flatten(), x1.flatten())))


def get_test_points(x0_grid: np.ndarray, x1_grid: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
    """
    Get all test points for a 2D grid. Three positions are tested:
        1) between the points along parameter x0
        2) between the points along parameter x1
        3) between the points along parameter x0 and x1
    """
    x0_grid_between, x1_grid_between = x0_grid[:-1] + np.diff(x0_grid) / 2, x1_grid[:-1] + np.diff(x1_grid) / 2

    x_test_arrays = []
    # between the x0 points
    x_test_arrays.append(arrays_to_grid_arrays(x0_grid_between, x1_grid))
    # between the x1 points
    x_test_arrays.append(arrays_to_grid_arrays(x0_grid, x1_grid_between))
    # between x0 and x1 points
    x_test_arrays.append(arrays_to_grid_arrays(x0_grid_between, x1_grid_between))

    return tuple(np.hstack(x_test_arrays))


def split_up_GP_prediction(GP_model, x_test: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
    """Divide the GP prediction over smaller batches to keep the CPU and memory usage limited."""
    GP_mean_arrays, GP_std_arrays = [], []

    steps = 100
    for i in tqdm(range(steps)):
        x_test_part = np.array_split(x_test, steps)[i]
        values = GP_model.fast_predict_y(x_test_part)
        _GP_mean, _GP_var = [v.numpy().squeeze() for v in values]
        _GP_std = np.sqrt(_GP_var)
        GP_mean_arrays.append(_GP_mean)
        GP_std_arrays.append(_GP_std)

    return np.hstack(GP_mean_arrays), np.hstack(GP_std_arrays)

In [27]:
x0_test, x1_test = get_test_points(*error_model.mean_interpolator.grid)
x_test = np.vstack((x0_test, x1_test)).T

print("Calculating the GP model predictions")
GP_mean, GP_std = split_up_GP_prediction(GP_model, x_test)

print("Calculating the error model predictions")
em_mean, em_std = error_model.mean_interpolator(x_test), error_model.std_interpolator(x_test)

errors_mean = np.abs(GP_mean - em_mean)
errors_std = np.abs(GP_std - em_std)

Calculating the GP model predictions


  0%|          | 0/100 [00:00<?, ?it/s]

Calculating the error model predictions


In [28]:
print("Biggest errors:")
print(f"\tmean error = {errors_mean.max()}")
print(f"\tstd error = {errors_std.max()}")

print("Average values:")
print(f"\tmean avg. = {GP_mean.mean()}")
print(f"\tstd avg. = {GP_std.mean()}")

Biggest errors:
	mean error = 1.1845073390193139e-07
	std error = 3.0083279783932326e-08
Average values:
	mean avg. = 0.004350989080289418
	std avg. = 0.05397040806698706
