Copyright 2021-2023 Lawrence Livermore National Security, LLC and other MuyGPyS
Project Developers. See the top-level COPYRIGHT file for details.

SPDX-License-Identifier: MIT

In [None]:
import numpy as np
from typing import Callable

from MuyGPyS.gp.distortion import AnisotropicDistortion, l2
from MuyGPyS.gp.hyperparameter import ScalarHyperparameter
from MuyGPyS.neighbors import NN_Wrapper
import MuyGPyS._src.math as mm


np.random.seed(0)

def _test_mask(data_count, train_step):
    return np.mod(np.arange(data_count), train_step) != 0

def _train_mask(train_step):
    return slice(None, None, train_step)

def _get_length_scale_array(
    array_fn: Callable,
    target_shape: mm.ndarray,
    **length_scales,
) -> mm.ndarray:
    AnisotropicDistortion._lengths_agree(
        len(length_scales), target_shape[-1]
    )
    # make sure each length_scale array is broadcastable when its shape is (batch_count,)
    shape = (1,) * (len(target_shape) - 2) + (-1,)
    return array_fn(
        [mm.reshape(value, shape) for value in length_scales.values()]
    ).T

def _anisotropic_dist_fn(
    diffs: mm.ndarray, **length_scales
) -> mm.ndarray:
    length_scale_array = _get_length_scale_array(
        mm.array, diffs.shape, **length_scales
    )
    return l2(diffs / length_scale_array)



In [None]:
# 1D Univariate
print("1D")
data_count = 20#5001
train_step = 2#10
x = np.linspace(0, data_count, data_count)#1, data_count)
x = x.reshape(data_count, 1)
# print(f"features x {x.shape} {x}")
test_mask = _test_mask(data_count, train_step)
train_mask  = _train_mask(train_step)
test_features = x[test_mask, :]
train_features = x[train_mask, :]
print(f"train features {train_features.shape} {train_features}")
test_count, _ = test_features.shape
train_count, _ = train_features.shape

# 1D distance scaling
sim_length_scale0 = ScalarHyperparameter(0.1)   # HP distance scaling dim 0
sim_length_scale1 = ScalarHyperparameter(0.5)   # HP distance scaling dim 1
features_aniso_dist_scaled = _anisotropic_dist_fn(
    train_features,
    length_scale0=sim_length_scale0(),
)
features_aniso_dist_scaled = features_aniso_dist_scaled.reshape(train_count, 1)
print(f"train features scaled {features_aniso_dist_scaled.shape} {features_aniso_dist_scaled}")

# 1D Nearest Neighbors
nn_count = 3#30
batch_count = train_count
nbrs_lookup = NN_Wrapper( # sklearn Euclidean distance
    train_features,
    nn_count,
    nn_method="exact",
    algorithm="ball_tree")
if train_count > batch_count:
    batch_indices = mm.iarray(
        np.random.choice(train_count, batch_count, replace=False)
    )
else:
    batch_indices = mm.arange(train_count, dtype=mm.itype)
batch_nn_indices, batch_nn_dists = nbrs_lookup.get_batch_nns(batch_indices) # F2
print(f"indices {batch_nn_indices.shape} {batch_nn_indices}")
print(f"dists {batch_nn_dists.shape} {batch_nn_dists}")


In [None]:
# 2D Univariate
print("\n2D")
points_per_dim = 6#60
data_count = points_per_dim**2
train_step = 2#13
x = np.linspace(0, points_per_dim, points_per_dim)#1, points_per_dim)
xx, yy = np.meshgrid(x, x)
xs = np.array(
    [
        [xx[i, j], yy[i, j]]
        for i in range(points_per_dim)
        for j in range(points_per_dim)
    ]
)
# print(f"features x  {x.shape} {x}")
# print(f"features xx {xx.shape} {xx}")
# print(f"features yy {yy.shape} {yy}")
# print(f"features xs {xs.shape} {xs}")
test_mask = _test_mask(data_count, train_step)
train_mask  = _train_mask(train_step)
test_features = xs[test_mask, :]
train_features = xs[train_mask, :]
test_count, _ = test_features.shape
train_count, _ = train_features.shape
print(f"train features {train_features.shape} {train_features}")

# 2D distance scaling
sim_length_scale0 = ScalarHyperparameter(0.1)   # HP distance scaling dim 0
sim_length_scale1 = ScalarHyperparameter(0.5)   # HP distance scaling dim 1
scales = _get_length_scale_array(mm.array, train_features.shape, length_scale0=sim_length_scale0(), length_scale1=sim_length_scale1())
print (scales)
train_features_scaled = train_features / scales
print (train_features_scaled)

# 2D Nearest Neighbors
nn_count = 3#30
batch_count = train_count
nbrs_lookup = NN_Wrapper( # sklearn Euclidean distance
    train_features_scaled,
    nn_count,
    nn_method="exact",
    algorithm="ball_tree")
if train_count > batch_count:
    batch_indices = mm.iarray(
        np.random.choice(train_count, batch_count, replace=False)
    )
else:
    batch_indices = mm.arange(train_count, dtype=mm.itype)
batch_nn_indices, batch_nn_dists = nbrs_lookup.get_batch_nns(batch_indices) # F2
print(f"indices {batch_nn_indices.shape} {batch_nn_indices}")
print(f"dists {batch_nn_dists.shape} {batch_nn_dists}")
