In [1]:
import warnings

import numpy as np
import pandas as pd

from itertools import product
from perlin_numpy import generate_perlin_noise_2d
from scipy.interpolate import griddata
from scipy.stats import percentileofscore
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import PolynomialFeatures



In [2]:
# Random seed
np.random.seed(1)

m = 5 # Physical observations per iteration (Default 5)
n = 100 # Simulated observations per iteration (Default 100)

# Number of observations to train the GP on before starting the active learning loop
pretrain_n = 1

# Minimum percentile to explore each iteration
percentile = 50

# Number of iterations to run the active learning loop (Default 10)
iterations = 10

# Corrective constant when calculating percentages
laplace_alpha = 0.01

# Degree of the polynomial to fit to the data
degree = 2

In [None]:
# Get the bounds for the function
xy_range = objectives[obj]["bounds"]

# Generate the meshgrid for the function
X = np.arange(*xy_range[0], ((xy_range[0][1] - xy_range[0][0]) / 100))
Y = np.arange(*xy_range[1], ((xy_range[1][1] - xy_range[1][0]) / 100))
X, Y = np.meshgrid(X, Y)
Z = objectives[obj]["func"](X, Y) # Values are computed and stored in Z

# Get the range of the function
z_range = (np.floor(np.min(Z)) - 1, np.ceil(np.max(Z)) + 1) # Used later in interpolations

# Generate the initial training data
df = pd.DataFrame(np.random.randint(100, size=(pretrain_n, 2))) 

# Rename the index columns
df.columns = ["i", "j"]

# Calculate the x, y, and z columns with some random error
df["x"] = X[0, df["i"]]
df["y"] = Y[df["j"], 0]
df["z"] = Z[df["i"], df["j"]] + ran_err(pretrain_n, 0.05)

# Create a copy of the dataframe to store the simulated data
simul_df = df.copy()

# Fit a polynomial (model) to the (initial) data
poly_features = PolynomialFeatures(degree=degree, include_bias=False)
poly_features = poly_features.fit_transform(simul_df[["x", "y"]])

# Fit a linear regression model to polynomial
poly_model = LinearRegression()
poly_model = poly_model.fit(poly_features, simul_df["z"])

# Generate some more initial training data 
df = pd.DataFrame(np.random.randint(100, size=(m, 2)))

# Rename the index columns
df.columns = ["i", "j"]

# Calculate the x, y, and z columns with some random error
df["x"] = X[0, df["i"]]
df["y"] = Y[df["j"], 0]
df["z"] = Z[df["i"], df["j"]] + ran_err(m, 0.05)

# Active learning loop
#Purpose: Iteratively improve the GP model by sampling new points based on model's predictions
for idx in range(1, iterations + 1):
    # Fit a Gaussian Process to the data
    krnl = RBF(length_scale=1) # Radial Basis Function (used for smoothness and variability of the functions that GP can model)
    model = GaussianProcessRegressor(kernel=krnl, normalize_y=False, random_state=3, alpha=0.001) # target values are not normalized, reproducibility is ensured, noise level to omprove stability in predictions. 

    # Randomly sample within our function's bounds
    tmp_df = pd.DataFrame(np.random.randint(100, size=(n, 2)))

    # Rename the index columns
    tmp_df.columns = ["i", "j"]

    # Generate some simulated data (using the poly model with some systematic error)
    tmp_df["x"] = X[0, tmp_df["i"]]
    tmp_df["y"] = Y[tmp_df["j"], 0]
    tmp_df["z"] = sys_err(n, tmp_df["x"], tmp_df["y"], poly_model)

    # Fit a GP to the data
    model.fit(tmp_df[["x", "y"]], tmp_df["z"])

    # Make predictions using simulated data
    pred = model.predict(tmp_df[["x", "y"]])

    # Interpolate the predictions to the meshgrid
    z = griddata((tmp_df["x"], tmp_df["y"]), pred, (X.T, Y.T), method="linear", fill_value=z_range[0])

    # Construct a grid of all points in the meshgrid
    tmp_df = pd.DataFrame(list(product(range(100), range(100))), columns=["i", "j"])

    # Calculate the x, y, and z columns with some random error
    tmp_df["x"] = X[0, tmp_df["i"]]
    tmp_df["y"] = Y[tmp_df["j"], 0]
    tmp_df["z"] = Z[tmp_df["i"], tmp_df["j"]] + ran_err(10000, 0.05) # Actual function values with some random error are calculated for full grid

    # Construct probability distribution for points to sample from
    mag = z[tmp_df["i"], tmp_df["j"]] # Computed mag for grid points
    p = mag - z.min()
    p = np.where(p > np.percentile(p, percentile), p, 0) + laplace_alpha 
    p /= p.sum()
    tmp_df = tmp_df.loc[np.random.choice(tmp_df.index, size=m, p=p, replace=False)]

    # Training Data is updated to include new sampled points 
    df = pd.concat([df, tmp_df], ignore_index=True)

    simul_df = pd.concat([simul_df, tmp_df], ignore_index=True)

    # Refitted Polynomial Model using updated data
    poly_features = PolynomialFeatures(degree=degree, include_bias=False)
    poly_features = poly_features.fit_transform(simul_df[["x", "y"]])

    poly_model = LinearRegression()
    poly_model = poly_model.fit(poly_features, simul_df["z"])

# GP model is refitted on the complete dataset
krnl = RBF(length_scale=1)
model = GaussianProcessRegressor(kernel=krnl, normalize_y=False, random_state=3, alpha=0.001)

model.fit(df[["x", "y"]], df["z"])

# Prediction for entire dataset 
pred = model.predict(df[["x", "y"]])

# Interpolate predictions on meshgrid
z = griddata((df["x"], df["y"]), pred, (X.T, Y.T), method="linear", fill_value=(z_range[0] + 1))

# Performance is printed as the percentile score of the maximum prediction compared to the actual function values
print(obj, percentileofscore(Z.flatten(), Z[np.unravel_index(z.argmax(), z.shape)]), sep="\t", end="\r\n")