# Sum of Gaussians

Presentation of the neural model supported by kernel regression on sum of Gaussians dataset.

*Note*: to see how dataset was generated, go to `dataset.ipynb`.

In [1]:
import sys

sys.path.append("..")  # we run from subdirectory, so to access sources append repo root to path

In [2]:
import pandas as pd
import seaborn as sns
import torch
from pydentification.data.datamodules.simulation import SimulationDataModule
from pydentification.models.modules.feedforward import TimeSeriesLinear
from sklearn import metrics

from src.nonparametric import kernels
from src.nonparametric.memory import ExactMemoryManager
from src.training.module import BoundedSimulationTrainingModule
from src.training.utils import unbatch

In [3]:
sns.set()

# Dataset

In [4]:
data_path = r"../data/csv/sum-of-gaussians.csv"
plot_path = r"../data/plots/sum-of-gaussians/"
model_path = r"../models/sum-of-gaussians-network-parameters.pt"

We do not plot this dataset, since it is 8 dimensional.

In [5]:
dataset = pd.read_csv(data_path)
dataset.head(3)

Unnamed: 0,x1,x2,x3,x4,x5,x6,x7,x8,y
0,-0.752315,0.965348,-0.449306,0.954614,0.110536,0.730278,-0.026545,-0.656438,0.018566
1,-0.563668,0.588315,0.32472,-0.769585,0.258406,-0.39856,0.144918,0.604386,0.010243
2,-0.067591,0.946457,0.700753,0.36382,0.145539,-0.792149,0.84495,0.284326,0.004732


In [6]:
train_size = 10_000
test_size = len(dataset) - train_size

In [7]:
# we use trick to generate static data using data-module for simulation
# the time and system dimension will be swapped to keep implementation the same
dm = SimulationDataModule.from_csv(
    dataset_path=data_path,
    input_columns=["x1", "x2", "x3", "x4", "x5", "x6", "x7", "x8"],
    output_columns=["y"],
    test_size=test_size,
    batch_size=32,  # used for prediction, we will not train network here
    validation_size=0.0,  # no need for validation
    shift=1,
    forward_input_window_size=1,
    forward_output_window_size=1,
    forward_output_mask=0,
)

In [8]:
# setup the data for prediction
dm.setup("fit")
dm.setup("predict")

In [9]:
for x, y in dm.train_dataloader():
    print(x.shape, y.shape)
    break  # system is 8 dimensional with single time-step

torch.Size([32, 1, 8]) torch.Size([32, 1, 1])


In [10]:
for x, y in dm.test_dataloader():
    print(x.shape, y.shape)
    break

torch.Size([32, 1, 8]) torch.Size([32, 1, 1])


# Model

Create the model from neural network we have trained before.

The settings for kernel regression are selected using hyper-parameter search, which are the best we found for this problem.

In [11]:
network = torch.nn.Sequential(
    TimeSeriesLinear(n_input_time_steps=1, n_input_state_variables=8, n_output_time_steps=1, n_output_state_variables=16),
    torch.nn.ReLU(),
    TimeSeriesLinear(n_input_time_steps=1, n_input_state_variables=16, n_output_time_steps=1, n_output_state_variables=16),
    torch.nn.ReLU(),
    TimeSeriesLinear(n_input_time_steps=1, n_input_state_variables=16, n_output_time_steps=1, n_output_state_variables=16),
    torch.nn.ReLU(),
    TimeSeriesLinear(n_input_time_steps=1, n_input_state_variables=16, n_output_time_steps=1, n_output_state_variables=16),
    torch.nn.ReLU(),
    TimeSeriesLinear(n_input_time_steps=1, n_input_state_variables=16, n_output_time_steps=1, n_output_state_variables=1),
)

In [12]:
network.load_state_dict(torch.load(model_path, weights_only=True))

<All keys matched successfully>

In [13]:
model = BoundedSimulationTrainingModule(
    network=network,
    optimizer=torch.optim.Adam(network.parameters()),  # will not be used anyway
    lr_scheduler=None,
    bound_during_training=False,
    bound_crossing_penalty=0.0,
    bandwidth=0.91,  # using kernel size 0.9 generate single NaN, so we set it to 0.91
    kernel=kernels.box_kernel,
    memory_manager=ExactMemoryManager(),  # dataset is low-dimensional, so no need to use approximated nearest neighbours here
    lipschitz_constant=0.25,  # known
    delta=0.1,  # user defined
    noise_variance=0.05,  # we know the variance from dataset generation
    k_neighbours=32,
    power=2,
    radius=None,
    memory_device="cpu",
    predict_device="cpu",
)

In [14]:
# unbatch the dataset to prepare memory manager
x, y = unbatch(dm.train_dataloader())
x.shape, y.shape

(torch.Size([10000, 1, 8]), torch.Size([10000, 1, 1]))

In [15]:
model.prepare(x, y)

# Test

Run the predictions with trained network and kernel regression.

In [16]:
outputs = model.predict_dataloader(dm.test_dataloader())

In [19]:
def range_ratio_error(error, y_true):
    return error / (y_true.max() - y_true.min())

def report(predictions, targets):
    rmse_network = metrics.root_mean_squared_error(y_true=targets, y_pred=predictions["network_predictions"].numpy().flatten())
    rmse_nonparametric = metrics.root_mean_squared_error(y_true=targets, y_pred=predictions["nonparametric_predictions"].numpy().flatten())
    rmse_bound = metrics.root_mean_squared_error(y_true=targets, y_pred=predictions["lower_bound"].numpy().flatten())

    print(f"RMSE NET:    {rmse_network:.4f}")
    print(f"RMSE KRE:    {rmse_nonparametric:.4f}")
    print(f"RMSE BOUNDS: {rmse_bound:.4f}", end="\n\n")
    print(f"RRR NET:     {range_ratio_error(error=rmse_network, y_true=targets):.2%}")
    print(f"RRR KRE:     {range_ratio_error(error=rmse_nonparametric, y_true=targets):.2%}")
    print(f"RRR BOUNDS:  {range_ratio_error(error=rmse_bound, y_true=targets):.2%}")

In [20]:
report(outputs, dataset["y"].iloc[train_size:].values)

RMSE NET:    0.0109
RMSE KRE:    0.0147
RMSE BOUNDS: 0.2723

RRR NET:     6.54%
RRR KRE:     8.87%
RRR BOUNDS:  163.99%
