In [None]:
def select_next_query(X, y, method='PI', xi=0.1, kappa=2.0, grid_size=100, apply_scaling=False, title="Acquisition Function"):
    """
    Selects the next query point using a specified acquisition strategy.

    Parameters:
    - X: np.ndarray of shape (n_samples, 2), input features
    - y: np.ndarray of shape (n_samples,), output values
    - method: str, one of 'PI', 'EI', 'UCB'
    - xi: float, exploration parameter for PI/EI
    - kappa: float, exploration parameter for UCB
    - grid_size: int, resolution of the search grid

    Returns:
    - next_query: np.ndarray of shape (2,), the selected input point
    - acquisition_map: np.ndarray of shape (grid_size**2,), acquisition values
    - X_grid: np.ndarray of shape (grid_size**2, 2), grid points evaluated
    """
    if apply_scaling == True:
        scaler = StandardScaler()
        y_scaled = scaler.fit_transform(y.reshape(-1, 1)).ravel()
        y = y_scaled
        #X_min = X.min(axis=0)
        #X_max = X.max(axis=0)
        #X = (X - X_min) / (X_max - X_min)

    # Fit GP model
    kernel = RBF(length_scale=0.1, length_scale_bounds='fixed')
    gp = GaussianProcessRegressor(kernel=kernel, alpha=1e-10)
    gp.fit(X, y)

    # Create grid over [0,1] x [0,1]
    x1 = np.linspace(0, 1, grid_size)
    x2 = np.linspace(0, 1, grid_size)
    x1_grid, x2_grid = np.meshgrid(x1, x2)
    X_grid = np.column_stack([x1_grid.ravel(), x2_grid.ravel()])

    # Predict GP mean and std
    mean, std = gp.predict(X_grid, return_std=True)

    # Compute acquisition function
    if method == 'PI':
        y_max = np.max(y)
        z = (mean - y_max - xi) / (std + 1e-12)
        acquisition_map = norm.cdf(z)

    elif method == 'EI':
        y_max = np.max(y)
        z = (mean - y_max - xi) / (std + 1e-12)
        acquisition_map = (mean - y_max - xi) * norm.cdf(z) + std * norm.pdf(z)

    elif method == 'UCB':
        acquisition_map = mean + kappa * std

    else:
        raise ValueError("Unsupported method. Choose from 'PI', 'EI', or 'UCB'.")

    # Select best point
    best_index = np.argmax(acquisition_map)
    next_query = X_grid[best_index]
    return next_query, acquisition_map, X_grid#, mean, std


In [None]:
def create_nd_grid_1(bounds, grid_size=50, dimension=1):
    """
    Create an N-dimensional grid over the given bounds with reduced density.
    Parameters:
    - bounds: List of (low, high) tuples for each dimension.
    - grid_size: Number of points per axis before downsampling.
    - dimension: Downsampling factor. The final number of points will be reduced
                 by approximately dimension^2 in total.
    Returns:
    - grid: A (M x N) array of grid points, where M is reduced.
    """
    print("Dimension of input space: ", dimension)
    # Generate full-resolution axes
    axes = [np.linspace(low, high, grid_size) for (low, high) in bounds]
    print("Each axis has ", len(axes[0]), " points")
    # Create meshgrid from full-resolution axes
    mesh = np.meshgrid(*axes)
    print
    if dimension is not None:
        # Downsample each axis by selecting every `dimension`-th point
        reduced_axes = [axis[::dimension] for axis in axes]
        # Create meshgrid from reduced axes
        mesh = np.meshgrid(*reduced_axes, indexing='ij')
    # Flatten meshgrid into (M x N) grid
    grid = np.stack([m.ravel() for m in mesh], axis=-1)
    print("Function will evaluate accross : ", grid.shape[0], " points")
    return grid

def create_nd_grid(bounds, grid_size=50, dimension=1):
    """
    Create an N-dimensional grid over the given bounds with reduced density.
    Parameters:
    - bounds: List of (low, high) tuples for each dimension.
    - grid_size: Number of points per axis before downsampling.
    - dimension: Downsampling factor. The final number of points will be reduced
                 by approximately dimension^2 in total.
    Returns:
    - grid: A (M x N) array of grid points, where M is reduced.
    """
    print("Dimension of input space:", len(bounds))
    axes = [np.linspace(low, high, grid_size) for (low, high) in bounds]
    print("Each axis has", len(axes[0]), "points before downsampling")

    # Downsample each axis
    reduced_axes = [axis[::dimension] for axis in axes]
    print("Each axis has", len(reduced_axes[0]), "points after downsampling")

    # Create meshgrid from reduced axes
    mesh = np.meshgrid(*reduced_axes, indexing='ij')

    # Flatten meshgrid into (M x N) grid
    grid = np.stack([m.ravel() for m in mesh], axis=-1)
    print("Function will evaluate across:", grid.shape[0], "points")

    return grid

def select_next_query_multi(X, y, method='PI', xi=0.1, kappa=2.0, grid_size=100, dimension=None,
                      apply_scaling=False, title="Acquisition Function"):
    """
    Selects the next query point using a specified acquisition strategy.

    Parameters:
    - X: np.ndarray of shape (n_samples, n_features), input features
    - y: np.ndarray of shape (n_samples,), output values
    - method: str, one of 'PI', 'EI', 'UCB'
    - xi: float, exploration parameter for PI/EI
    - kappa: float, exploration parameter for UCB
    - grid_size: int, resolution of the search grid
    - apply_scaling: bool, whether to standardize y
    - visualize: bool, whether to plot acquisition surface (only for 2D)
    - title: str, plot title

    Returns:
    - next_query: np.ndarray of shape (n_features,), the selected input point
    - acquisition_map: np.ndarray of shape (grid_size**n,), acquisition values
    - X_grid: np.ndarray of shape (grid_size**n, n_features), grid points evaluated
    - mean: np.ndarray of shape (grid_size**n,), GP mean predictions
    - std: np.ndarray of shape (grid_size**n,), GP std predictions
    """
    if apply_scaling:
        scaler = StandardScaler()
        y = scaler.fit_transform(y.reshape(-1, 1)).ravel()

    n_features = X.shape[1]
    print("Number of features: ", n_features)
    bounds = [(0, 1)] * n_features
    X_grid = create_nd_grid(bounds, grid_size, dimension)

    kernel = RBF(length_scale=0.1, length_scale_bounds='fixed')
    gp = GaussianProcessRegressor(kernel=kernel, alpha=1e-10)
    gp.fit(X, y)

    mean, std = gp.predict(X_grid, return_std=True)
    print("Mean ", len(mean))
    print("Std  ", len(std))

    if method == 'PI':
        y_max = np.max(y)
        z = (mean - y_max - xi) / (std + 1e-12)
        acquisition_map = norm.cdf(z)

    elif method == 'EI':
        y_max = np.max(y)
        z = (mean - y_max - xi) / (std + 1e-12)
        acquisition_map = (mean - y_max - xi) * norm.cdf(z) + std * norm.pdf(z)

    elif method == 'UCB':
        acquisition_map = mean + kappa * std

    else:
        raise ValueError("Unsupported method. Choose from 'PI', 'EI', or 'UCB'.")

    best_index = np.argmax(acquisition_map)
    next_query = X_grid[best_index]

    return next_query, acquisition_map, X_grid

In [None]:
def plot_acquisition_heatmap(
    X,                   # shape (n_samples, 2): queried points
    acquisition_map,     # shape (grid_size**2,): acquisition values
    X_grid,              # shape (grid_size**2, 2): grid points
    next_query=None,     # shape (2,), optional: next query point
    title='Acquisition Heatmap',
    cmap='viridis',
    color_range=(-3, 1), # default color scale for consistency
    save_path=None       # optional: path to save plot
):
    grid_size = int(np.sqrt(len(acquisition_map)))
    acquisition_2d = acquisition_map.reshape(grid_size, grid_size)

    # Compute extent from grid
    x_min, x_max = X_grid[:, 0].min(), X_grid[:, 0].max()
    y_min, y_max = X_grid[:, 1].min(), X_grid[:, 1].max()
    extent = [x_min, x_max, y_min, y_max]

    plt.figure(figsize=(8, 6))
    im = plt.imshow(
        acquisition_2d,
        origin='lower',
        extent=extent,
        cmap=cmap,
        aspect='auto',
        vmin=color_range[0],
        vmax=color_range[1]
    )
    plt.colorbar(im, label='Acquisition Value')

    # Queried points
    plt.scatter(X[:, 0], X[:, 1], c='white', edgecolors='black', label='Queried Points')

    # Next query
    if next_query is not None:
        plt.plot(next_query[0], next_query[1], 'r*', markersize=14, label='Next Query')

    plt.title(title)
    plt.xlabel('Feature 1')
    plt.ylabel('Feature 2')
    plt.legend()
    plt.tight_layout()

    if save_path:
        plt.savefig(save_path)
    else:
        plt.show()

In [None]:

def log_query_result(method, param, x, y_val):
    """Stores a query result with dimensionality awareness."""
    query_results.append({
        'method': method,
        'xi': param if method in ['PI', 'EI'] else None,
        'kappa': param if method == 'UCB' else None,
        'x': np.array(x),
        'y': y_val,
        'dim': len(x)
    })

def log_query_candidate(method, param, x, acq_score):
    """Stores a candidate query point and its acquisition score."""
    query_results.append({
        'method': method,
        'xi': param if method in ['PI', 'EI'] else None,
        'kappa': param if method == 'UCB' else None,
        'x': np.array(x),
        'score': acq_score,
        'dim': len(x)
    })

def find_best_query(results):
    print("Total number of points to evaluate:", len(results))
    """Returns the best query point from a collection of arbitrary dimensions."""
    if not results:
        raise ValueError("No results to evaluate.")

    best = max(results, key=lambda r: r['y'])

    print("üîç Best query found:")
    print(f"Method: {best['method']}")
    if best['method'] in ['PI', 'EI']:
        print(f"xi: {best['xi']}")
    else:
        print(f"kappa: {best['kappa']}")
    print(f"Dimensions: {best['dim']}")
    print(f"x: {np.array2string(best['x'], precision=4, separator=', ')}")
    print(f"f(x): {best['y']:.4f}")

    return best

def find_best_candidate(results):
    print("Total number of points to evaluate:", len(results))
    for item in results:
        print(item)
    """Returns the best candidate point based on acquisition score."""
    if not results:
        raise ValueError("No candidates to evaluate.")

    best = max(results, key=lambda r: r['score'])

    print("üîç Best candidate (highest acquisition score):")
    print(f"Method: {best['method']}")
    if best['method'] in ['PI', 'EI']:
        print(f"xi: {best['xi']}")
    else:
        print(f"kappa: {best['kappa']}")
    print(f"Dimensions: {best['dim']}")
    print(f"x: {np.array2string(best['x'], precision=4, separator=', ')}")
    print(f"Score: {best['score']:.6f}")

    return best