# Helper Function

In [None]:
"""
This module contains all the functions that define the kernels
"""
import numpy as np
import matplotlib.pyplot as plt
import torch


def full_kernel_classic(x, y):
    """
    Classical evaluation of the full kernel
    :param x: arg 1
    :param y: arg 2
    :return:
    """
    k = 1
    for i in range(len(x)):
        k = k * np.cos(1/2*(x[i]-y[i]))**2
    return k


def kernel_matrix_classic(X, Y):
    """
    compute the kenel matrix of the full kernel
    :param X: vector of samples
    :param Y: vector of samples
    :return:
    """
    K = np.zeros((len(X), len(Y)))
    for i in range(len(X)):
        for j in range(len(Y)):
            K[i, j] = full_kernel_classic(X[i], Y[j])
    return K


def kernel_matrix_classic_torch(X, Y):
    """
    compute the kenel matrix of the full kernel potentially utilizing some parallel processing
    :param X: vector of samples
    :param Y: vector of samples
    :return:
    """
    if type(X) is np.ndarray:
        X = torch.from_numpy(X)
    if type(Y) is np.ndarray:
        Y = torch.from_numpy(Y)
    # create tensor with entry i x j x k equal to x_ik - y_jk
    X = X.unsqueeze(1).expand(-1, Y.size(0), -1)
    Y = Y.unsqueeze(0).expand(X.size(0), -1, -1)
    K = X - Y
    K = torch.cos(K / 2) ** 2
    K = torch.prod(K, 2)
    return K



In [None]:
import numpy as np

class GaussianProcessRegressor:
    """
    Gaussian Process Regressor for d-dimensional inputs.
    Returns both predictive mean and std if requested.
    """
    def __init__(self, kernel, alpha=1e-5):
        self.kernel = kernel
        self.alpha = alpha
        self.X_train = None
        self.y_train = None
        self.K_inv = None

    def fit(self, X, y):
        X = np.array(X)
        y = np.array(y)
        self.X_train = X
        self.y_train = y
        K = self.kernel(X, X)
        K += self.alpha * np.eye(len(X))
        self.K_inv = np.linalg.inv(K)

    def predict(self, X_test, return_std=False):

        X_test = np.array(X_test)
        K_star = self.kernel(X_test, self.X_train)
        y_mean = K_star @ (self.K_inv @ self.y_train)

        if return_std:
            K_star_star = self.kernel(X_test, X_test)
            cov = K_star_star - K_star @ self.K_inv @ K_star.T
            var = np.diag(cov)
            var = np.maximum(var, 0.0)
            y_std = np.sqrt(var)
            return y_mean, y_std
        else:
            return y_mean


In [None]:
from itertools import product

def build_grid(bounds, n_grid):
    """
    Build a uniform grid of points within the given 'bounds'.

    Parameters
    ----------
    bounds : list of (low, high) for each dimension (length d)
    n_grid : int
        Number of grid points per dimension.

    Returns
    -------
    X_grid : np.ndarray of shape (n_grid^d, d)
        All points in the grid.
    """
    # For each dimension, create an array of n_grid points from low to high
    axes = [np.linspace(low, high, n_grid) for (low, high) in bounds]
    # Cartesian product of all axes
    # e.g. for d=2, we get all pairs (x,y); for d=3, all (x,y,z), etc.
    mesh = list(product(*axes))  # a list of d-tuples
    return np.array(mesh)




In [None]:
import numpy as np
import matplotlib.pyplot as plt
from itertools import product
from scipy.stats import norm



def SquareCB_nd(
    f,                 # black-box reward function, shape: (n, d) -> (n,)
    gp,                # a GaussianProcessRegressor (our "oracle")
    bounds,            # list of (low, high) for each dimension
    n_iter=100,         # number of SquareCB iterations
    init_points=3,     # number of initial random samples
    n_grid=5,         # discrete set of arms = the grid
    gamma=1.0,         # SquareCB learning-rate parameter
    mu=None,           # exploration offset, default = K (the number of grid points)
    random_state=None,
    noise_std=0.1,     # optional: noise level for synthetic reward observation
    verbose=False,
    beta_func = None,
    error = 1
):
    """
    Illustrative 'SquareCB' variant for Bayesian Opt. with a GP 'oracle' + discrete grid.

    Parameters
    ----------
    f : callable
        The unknown reward function mapping array (n,d)->(n,).
    gp : GaussianProcessRegressor
        Must have .fit(X, y) and .predict(X, return_std=...) etc.
        We treat gp as our 'online regression' oracle for the paper's SquareCB.
    bounds : list of (low, high)
        The domain bounding box in d dimensions.
    n_iter : int
        Number of SquareCB rounds.
    init_points : int
        How many random points to sample initially (random exploration).
    n_grid : int
        Grid resolution in each dimension.  => K = n_grid^d arms.
    gamma : float
        The 'learning rate' parameter from SquareCB.  Usually tuned or derived from theory.
    mu : float or None
        Exploration offset.  If None, we set mu = K = n_grid^d.
    random_state : int or None
        For reproducibility.
    noise_std : float
        Optional noise standard deviation for the observation of f(x).
    verbose : bool
        Print iteration details if True.

    Returns
    -------
    regrets : ndarray of shape (n_iter + init_points,)
        Cumulative regret at each iteration.
    X_samples : ndarray of shape (n_iter + init_points, d)
        The chosen points (arms) each round.
    Y_samples : ndarray of shape (n_iter + init_points,)
        The noisy observed rewards from f.
    """

    if random_state is not None:
        np.random.seed(random_state)

    d = len(bounds)
    # 1) Build the discrete action set via a grid
    X_grid = build_grid(bounds, n_grid=n_grid)   # shape (K, d)
    K = X_grid.shape[0]                          # total number of arms

    if mu is None:
        mu = 1#float(K)  # default per the paper's suggestion is mu = #actions

    # 2) Approximate global max among the grid for regret computation
    Y_grid = f(X_grid)
    idx_best = np.argmax(Y_grid)
    x_star = X_grid[idx_best]
    f_star = Y_grid[idx_best]   # approximate best reward among grid

    # 3) Initialize data with some random points from domain
    #    (like in your GP-UCB code)
    def sample_random(n):
        return np.array([
            [np.random.uniform(low, high) for (low, high) in bounds]
            for _ in range(n)
        ])
    X_samples = sample_random(init_points)            # (init_points, d)
    Y_samples = f(X_samples) + noise_std * np.random.randn(init_points)  # add noise

    # 4) Track cumulative regrets
    regrets = np.zeros(n_iter + init_points)
    cum_regret = 0.0
    for i in range(init_points):
        cum_regret += (f_star - Y_samples[i])
        regrets[i] = cum_regret

    # 5) Main loop: 'SquareCB'
    for step in range(n_iter):
        t = init_points + step + 1  # 1-based iteration index

        # (a) Fit the GP on current data (X_samples, Y_samples)
        gp.fit(X_samples, Y_samples)

        # (b) Predict the reward for each action a in X_grid
        #     SquareCB uses "predicted losses" but we can equivalently treat negative reward
        #     or just pick an "argmin predicted" approach. We'll treat "loss = - predicted_reward."
        mean_grid, std_grid = gp.predict(X_grid, return_std=True)
        np.random.seed(None)
        # In classical SquareCB, we want a "score" = predicted loss.
        # If we only have predicted reward, we can define:
        #   y_hat[a] = -mean_grid[a]
        # i.e. the "predicted loss" = negative predicted reward
        y_hat_grid = -mean_grid

        # (c) Identify 'best action' b_t = arg min y_hat
        b_idx = np.argmin(y_hat_grid)
        y_hat_b = y_hat_grid[b_idx]

        # (d) For each a != b, define p_t(a) = 1 / [mu + gamma*(y_hat[a] - y_hat_b)]
        #     Then p_t(b) = 1 - sum_{a!=b} p_t(a).

        mu = K
        gamma = K #( K * n_iter / error ) ** 0.5 * 10


        p = np.zeros(K, dtype=float)
        sum_others = 0.0
        for a_idx in range(K):
            if a_idx != b_idx:
                gap = (y_hat_grid[a_idx] - y_hat_b)
                # If gap <= 0, it can be tricky or cause big p. Add small offset if needed
                denom = mu + gamma*max(0.0, gap)
                p[a_idx] = 1.0 / denom
                sum_others += p[a_idx]
        p[b_idx] = 1.0 - sum_others
        if p[b_idx] < 0.0:
            # If numerical issues occur, clip or renormalize
            p = np.clip(p, 0, None)
            p /= np.sum(p)

        #print(f(X_grid),mean_grid,p)
        #print(f"======={step}========")
        # (e) Sample next action x_next from distribution p
        a_next = np.random.choice(K, p=p)
        x_next = X_grid[a_next].reshape(1, -1)

        # (f) Observe reward from f with optional noise
        y_true = f(x_next)  # shape (1,)
        noise = noise_std * np.random.randn()
        y_obs = y_true[0] + noise

        # (g) Update data
        X_samples = np.vstack([X_samples, x_next])
        Y_samples = np.concatenate([Y_samples, [y_obs]])

        # (h) Update cumulative regret
        cum_regret += (f_star - y_true[0])
        regrets[init_points + step] = cum_regret

        if verbose:
            print(f"[SquareCB] Round {step+1}/{n_iter}, t={t}, bestArm={b_idx}, "
                  f"ChosenArm={a_next}, regret={cum_regret:.4f}")

    return regrets, X_samples, Y_samples





import numpy as np
import matplotlib.pyplot as plt
from tqdm import trange

def run_multiple_experiments_SquareCB(
    f,
    gp,
    bounds,
    n_runs=5,
    n_iter=10,
    init_points=3,
    n_grid=50,
    beta_func=None,
    random_state=None,
    error = 1,
    verbose=False
):
    """
    Runs the GP-UCB experiment 'n_runs' times, each time creating a new GP instance.
    Returns the average cumulative regret across runs.

    Parameters
    ----------
    f : callable
        The black-box reward function.
    gp_class_factory : callable
        A function that, when called, returns a *new* untrained GaussianProcessRegressor
        (or similar). For example:
          lambda: GaussianProcessRegressor(kernel=RBF(...), alpha=..., optimizer=None)
        We need a fresh GP for each run, so that each run is independent.
    bounds : list of (low, high)
        Domain bounding box.
    n_runs : int
        Number of independent runs to average over.
    n_iter : int
        Number of GP-UCB rounds (not counting the init_points).
    init_points : int
        Number of random initial points in each run.
    n_grid : int
        Grid resolution for argmax search in each run.
    beta_func : callable or None
        If None, use a default log-based. Otherwise a function beta_func(t) -> float.
    random_state : int or None
        For reproducibility. If set, seeds the first run's RNG, then subsequent runs
        will shift the seed.
    verbose : bool
        Whether to print details for each run.

    Returns
    -------
    avg_regret : ndarray of shape (n_iter + init_points,)
        The pointwise average of the cumulative regret across runs.
    regrets_all : ndarray of shape (n_runs, n_iter + init_points)
        The individual run's cumulative-regret curves.
    """

    from copy import deepcopy

    # If we want reproducibility, set base seed
    base_seed = random_state if random_state is not None else None

    # We'll store the regret curve for each run here
    regrets_all = []

    for i in trange(n_runs, desc="GP-UCB Experiments"):
        # For each run, optionally shift the seed
        if base_seed is not None:
            # shift by i to get distinct seeds
            np.random.seed(base_seed + i)

        # Create a fresh GP instance
        gp_model = gp

        # Import or copy the SquareCB_nd from your previous code snippet:
        regrets, X_samples, Y_samples = SquareCB_nd(
            f=f,
            gp=gp_model,
            bounds=bounds,
            n_iter=n_iter,
            init_points=init_points,
            n_grid=n_grid,
            beta_func=beta_func,
            random_state=None,  # we've set the seed externally
            error = error,
            verbose=(verbose and i == 0)  # only verbose in the 1st run, e.g.
        )

        regrets_all.append(regrets)

    regrets_all = np.array(regrets_all)  # shape (n_runs, n_steps)
    avg_regret = regrets_all.mean(axis=0)
    std_regret = regrets_all.std(axis=0)
    return avg_regret, regrets_all, std_regret





In [None]:
pip install PennyLane~=0.40.0

In [None]:
"""
This module contains all the functions that define the kernels
"""
import pennylane as qml
from pennylane.templates.layers import RandomLayers
import numpy as np
import matplotlib.pyplot as plt
import torch


#@qml.template
def product_encoding(x, wires=[0]):
    assert len(x) == len(wires), 'number of parameters does not match number of qubits'
    # encode data via single qubit rotation of each dimension seperately
    for i in wires:
        # data encoding
        qml.RX(x[i], wires=i)


def circuit_function(x, data_encoding, qubits, seed, return_reduced_state=False, M="Z", reduced_state=0):
    np.random.seed(seed)
    wires = [i for i in range(qubits)]
    # First encode the data
    data_encoding(x, wires=wires)

    # Here we define how the random unitary matrix V is created
    layers = qubits #** 2  # we use qubits^2 rotations
    # create random angles corresponding to drawing V
    weights = np.random.uniform(0, 2*np.pi, size=(layers, qubits))
    ratio_imprim = 0.5 # use as many CNOTs as rotation gates
    RandomLayers(weights=weights, ratio_imprim=ratio_imprim, wires=wires, seed=seed)
    # for i in range(qubits-1):
    #     qml.CNOT(wires=[i,i+1])

    #print(reduced_state)
    if return_reduced_state:
        # return the reduced state of the single qubit that defines the biased kernel.
        return qml.density_matrix(reduced_state)
    else:  # return the function f(x)
        if M=="Z":
            #  the observable that defines the target function (f^*) is arbitrarily chosen as the pauli-Z on the first qubit.
            return qml.expval(qml.PauliZ(0))
        if M=="0":
            #  you can also chooose the observable |0><0| or implement anything else.
            return qml.expval(qml.Hermitian([[1, 0], [0, 0]], wires=0))


def full_kernel_fct(x, y, data_encoding):
    """
    defines the full kernel via the simulation of the quantum circuit. for the simple kernel we consider, we directly
    evaluate its classical description
    :param x: arg 1
    :param y: arg 2
    :param data_encoding: Define the data encoding unitary
    :return:
    """
    # initial state is all zero
    qubits = len(x)
    wires = [i for i in range(qubits)]  # qubits to work on
    # encode first argument
    data_encoding(x, wires=wires)
    #  inversely encode second argument
    qml.inv(data_encoding(y, wires=wires))

    # project back onto the all 0 state
    projector = np.zeros((2 ** qubits, 2 ** qubits))
    projector[0, 0] = 1
    return qml.expval(qml.Hermitian(projector, wires=range(qubits)))


def full_kernel_classic(x, y):
    """
    Classical evaluation of the full kernel
    :param x: arg 1
    :param y: arg 2
    :return:
    """
    k = 1
    for i in range(len(x)):
        k = k * np.cos(1/2*(x[i]-y[i]))**2
    return k


def kernel_matrix_classic(X, Y):
    """
    compute the kenel matrix of the full kernel
    :param X: vector of samples
    :param Y: vector of samples
    :return:
    """
    K = np.zeros((len(X), len(Y)))
    for i in range(len(X)):
        for j in range(len(Y)):
            K[i, j] = full_kernel_classic(X[i], Y[j])
    return K


def kernel_matrix_classic_torch(X, Y):
    """
    compute the kenel matrix of the full kernel potentially utilizing some parallel processing
    :param X: vector of samples
    :param Y: vector of samples
    :return:
    """
    if type(X) is np.ndarray:
        X = torch.from_numpy(X)
    if type(Y) is np.ndarray:
        Y = torch.from_numpy(Y)
    # create tensor with entry i x j x k equal to x_ik - y_jk
    X = X.unsqueeze(1).expand(-1, Y.size(0), -1)
    Y = Y.unsqueeze(0).expand(X.size(0), -1, -1)
    K = X - Y
    K = torch.cos(K / 2) ** 2
    K = torch.prod(K, 2)
    return K


def biased_kernel_fct(x, y, reduced_state):
    """
    Compute the biased kernel function with the reduced density matrix rho(x) = reduced_state(x)
    :param x:
    :param y:
    :param reduced_state: function that takes a single argument and returns the a reduced denisty operator
    :return:
    """
    # works with the reduced first qubit density operators
    rho_x = reduced_state(x)
    rho_y = reduced_state(y)
    k = np.real(np.trace(rho_x @ rho_y))
    return k


def biased_kernel_matrix(X, Y, reduced_state):
    """
    Compute the kernel matrix of the biased kernel with torch
    :param X: input vector of data
    :param Y: input vector of data
    :param reduced_state: function that takes a single argument and returns the a reduced denisty operator
    :return: kernel matrix
    """
    # works with the reduced density operators
    rho_X = torch.tensor([np.array(reduced_state(x)) for x in X]) # compute reduced density operator for all inputs
    rho_Y = torch.tensor([np.array(reduced_state(y)) for y in Y])
    K = np.real(torch.einsum('aik,bki -> ab', rho_X, rho_Y)) # trace[rho(x)rho(y)
    return K # formula for the swap test


def get_functions(qubits, seed, M='Z', second_qubit=False,dim=1):
    """
    Utility function that creates all the functionalities for a experimental setting
    :param qubits: number of qubits involved == dimensionality of data
    :param seed: random seed to generate V
    :param M: observable on qubit 1 that defines the target functioon
    :param second_qubit: If trueallso returns functions to compute the kernel matrix of q_w (2nd qubit)
    :return:
    """
    dev = qml.device('default.qubit', wires=qubits)
    # define the full kernel
    k_prod_fct = lambda x, y: full_kernel_fct(x, y, data_encoding=product_encoding)
    k_prod = qml.QNode(k_prod_fct, dev)

    # make everything work with the pennylane interfaces

    # function that returns reduced state of first qubit
    projected_kernels = []
    subset_index = [[0], [1], [2], [0, 1], [0, 2], [1, 2], [0, 1, 2]]
    subset_index = subset_index[:dim]
    for i in subset_index:
      reduced_state_fct = lambda x,i=i: circuit_function(x, product_encoding, qubits, return_reduced_state=True, seed=seed, reduced_state=i)
      reduced_state = qml.QNode(reduced_state_fct, dev)
      # kernel matrix for biased kernel of qubit 1
      k_prod_bias = lambda x, y: biased_kernel_fct(x, y, reduced_state)
      kernel_matrix_bias1 = lambda X, Y,reduced_state=reduced_state: biased_kernel_matrix(X, Y, reduced_state)
      projected_kernels.append(kernel_matrix_bias1)

    # kernel_matrix_bias = projected_kernels
    def kernel_matrix_bias(X,Y):
      res = projected_kernels[0](X,Y)
      for i in range(1,len(projected_kernels)):
        res += projected_kernels[i](X,Y)
      #if qubits > 1:
      # for i in range(qubits):
      #   print(f"kernel {i}:")
      #   print(projected_kernels[i](X,Y))

      return res / len(projected_kernels)


    # target function
    f_fct = lambda x: circuit_function(x, product_encoding, qubits, return_reduced_state=False, seed=seed, M=M)
    f = qml.QNode(f_fct, dev)

    # potentially add second qubit biased kernel functionality
    if second_qubit==True and qubits > 1:
        reduced_state_fct_2 = lambda x: circuit_function(x, product_encoding, qubits, return_reduced_state=1, seed=seed, reduced_state=1)
        reduced_state_2 = qml.QNode(reduced_state_fct_2, dev)
        k_prod_bias_2 = lambda x, y: biased_kernel_fct(x, y, reduced_state_2)
        kernel_matrix_bias_2 = lambda X, Y: biased_kernel_matrix(X, Y, reduced_state_2)
        return k_prod, k_prod_bias, f, kernel_matrix_bias, kernel_matrix_bias_2
    else:
        return k_prod, k_prod_bias, f, kernel_matrix_bias, kernel_matrix_bias




In [None]:
def feature_map_full(x):
    """
    X: shape (N, 2) where each row is [x1, x2].

    Returns: Phi, shape (N, 5),
             where Phi[i, :] = [1, cos(x1), sin(x1), cos(x2), sin(x2)] for the i-th sample.
    """
    x0,x1,x2 = x[0],x[1],x[2]
    u0,u1,u2,u3 = 1/8,1/8,1/16,1/32

    return np.array([
        u0**0.5 * 1,

        u1**0.5 * np.cos(x0), u1**0.5 * np.sin(x0),
        u1**0.5 * np.cos(x1), u1**0.5 * np.sin(x1),
        u1**0.5 * np.cos(x2), u1**0.5 * np.sin(x2),

        u2**0.5 * np.cos(x0+x1), u2**0.5 * np.sin(x0+x1), u2**0.5 * np.cos(x0-x1), u2**0.5 * np.sin(x0-x1),
        u2**0.5 * np.cos(x0+x2), u2**0.5 * np.sin(x0+x2), u2**0.5 * np.cos(x0-x2), u2**0.5 * np.sin(x0-x2),
        u2**0.5 * np.cos(x1+x2), u2**0.5 * np.sin(x1+x2), u2**0.5 * np.cos(x1-x2), u2**0.5 * np.sin(x1-x2),

        u3**0.5 * np.cos(x0+x1+x2), u3**0.5 * np.sin(x0+x1+x2),
        u3**0.5 * np.cos(x0+x1-x2), u3**0.5 * np.sin(x0+x1-x2),
        u3**0.5 * np.cos(x0-x1+x2), u3**0.5 * np.sin(x0-x1+x2),
        u3**0.5 * np.cos(x0-x1-x2), u3**0.5 * np.sin(x0-x1-x2)
    ])

sigma_w = 0.5
w_random = np.random.normal(loc=0.0, scale=sigma_w, size=27)
#w_random = np.array([1,1,1,1,1,1,1,1,1])
#w_random[7],w_random[8] = 1,1
print(w_random)

def f_draw_from_GP(x):
    """A random draw from the GP prior with feature map phi, weights ~ N(0, sigma^2 I)."""
    Phi = feature_map_full(x)
    return np.sum(Phi * w_random)

# repeat for different kernel dimension

In [None]:
w_random = np.array([1 for _ in range(27)])
w_random = np.random.normal(loc=0.0, scale=sigma_w, size=27)


qubits = 3
seed = 1
bounds = [(0, 2*np.pi), (0, 2*np.pi),(0, 2*np.pi)]

quantum_f = lambda X: np.array([f_draw_from_GP(X[i]) for i in range(X.shape[0])])


dim_tried = []
regret_for_dim = []
std_regret_for_dim = []

for dim in trange(1,8, desc="Number of projected kernels Used"):

  _, _, f, kernel_matrix_bias, kernel_second_qubit = get_functions(
            qubits=3, seed=seed, second_qubit=True,dim=dim
        )

  kernell = lambda X,Y: kernel_matrix_bias(X,Y).numpy()

  gp = GaussianProcessRegressor(kernel=kernell, alpha=1e-4)

  avg_best, best_hist_all,std_regret = run_multiple_experiments_SquareCB(
  f=quantum_f,
  gp=gp,
  bounds=bounds,
  n_runs=30,
  n_iter=100,
  init_points=1,
  n_grid=10,
  beta_func=None,  # use the default inside SquareCB_nd
  random_state=42,
  error = 6/dim,
  verbose=False
)

  dim_tried.append(dim)
  regret_for_dim.append(avg_best[-1]/6)
  std_regret_for_dim.append(std_regret[-1]/6)

plt.errorbar(dim_tried,regret_for_dim,std_regret_for_dim,ls="-",
             marker='d',
             color="#009E73",
             alpha=1.0,
             capsize=4)
plt.title("Regret for Different Kernel Used")
plt.xlabel("Number of Projected Kernel Used for Modeling")
plt.ylabel("Regret for T=100")
plt.grid(True)
plt.legend()
plt.show()

In [None]:
plt.plot(dim_tried,regret_for_dim)
plt.title("Regret for Different Kernel Used")
plt.xlabel("Dimension of Kernel Used")
plt.ylabel("Regret")
plt.grid(True)
plt.legend()
plt.show()

In [None]:
print(regret_for_dim)
print(std_regret_for_dim)