In [161]:
import numpy as np
import chaospy as cp
import GPy

class SequentialPCKriging:
    def __init__(self, input_dist, order=3):
        self.input_dist = input_dist
        self.expansion = cp.generate_expansion(order, input_dist)
        self.gp_model = None

    def fit_pce(self, x, y):
        self.pce_model = cp.fit_regression(self.expansion, x.T, y)

    def pce_predict(self, x):
        return self.pce_model(x)

    def fit_kriging(self, x, y):
        discrepancy = y - self.pce_predict(x)
        print(y.shape)
        print(self.pce_predict(x))
        print(discrepancy.shape)
        kernel = GPy.kern.RBF(input_dim=2, variance=1., lengthscale=1.)
        self.gp_model = GPy.models.GPRegression(x, discrepancy, kernel)
        self.gp_model.optimize()

    def fit(self, x, y):
        self.fit_pce(x, y)
        self.fit_kriging(x, y)

    def predict(self, x_new):
        pce_pred = self.pce_predict(x_new)
        gp_pred, gp_var = self.gp_model.predict(x_new)
        return pce_pred + gp_pred, np.sqrt(gp_var)

### Expected Improvement

The idea of this acquisition function here is that

First, we find the current best in our samples $Y^{s}$: $ \hat{y}_n = \textbf{max}_{1 \le i \le n} Y^{s}_i $

Then over the space of hypothetical simulation:

$$ a(x,y) = \textbf{max} \{ 0, y - \hat_{y}_n \} $$

The improvement function:

$$ a(x,y) = \int_{-\infty}^{\infty} \textbf{max} \{ 0, y - \hat_{y}_n \} p(y|x) dy $$

The closed-form expression:

$$ a(x) = (m(x)-\hat{y}_{n}) \Phi(Z) + \sigma (x) \phi  (Z) $$
$$ Z = \frac{m(x)-\hat{y}_n}{\sigma (x)} $$

Note here it does not really matter what the metamodel is, the only assumption made here is that our predictive distribution is Gaussian.


In [162]:
from scipy.stats import norm

def constrained_expected_improvement(X, X_sample, Y_sample, f_sur, g_sur, xi=0.01):
    '''
    Computes the Constrained Expected Improvement (CEI) at points X based on existing
    samples X_sample and Y_sample using a Gaussian process surrogate model for the
    objective function and a constraint.

    Args:
    X: Points at which CEI shall be computed (m x d).
    X_sample: Sample locations (n x d).
    Y_sample: Sample values (n x 1).
    f_sur: A trained surrogate model for the objective function.
    g_sur: A trained surrogate model for the constraint.
    xi: Exploitation-exploration trade-off parameter.

    Returns:
    Constrained Expected Improvements at points X.
    '''
    # Making a prediction at point X for the objective function
    mu_f, sigma_f = f_sur.predict(X, return_std=True)

    # Calculating the improvement for the objective function
    mu_sample_opt = np.max(Y_sample)
    imp_f = mu_f - mu_sample_opt - xi
    Z_f = imp_f / sigma_f
    ei = imp_f * norm.cdf(Z_f) + sigma_f * norm.pdf(Z_f)
    ei[sigma_f == 0.0] = 0.0  # If sigma_f is 0, the expected improvement is 0

    # Making a prediction at point X for the constraint
    mu_g, sigma_g = g_sur.predict(X, return_std=True)

    # Calculating the probability of feasibility
    P_feasibility = norm.cdf(0, loc=mu_g, scale=sigma_g)

    # Multiplying EI by the probability of feasibility
    cei = ei * P_feasibility

    return cei


In [163]:
from scipy.optimize import minimize

def propose_location_constrained(acquisition, X_sample, Y_sample, fk, gk, bounds, xi=0.01):
    """
    Proposes the next sampling point by optimizing the constrained acquisition function.

    Args:
        acquisition: Acquisition function.
        X_sample: Sample locations (n x d).
        Y_sample: Sample values (n x 1).
        fk: Trained surrogate model for objective function.
        gk: Trained surrogate model for constraint.
        bounds: Bounds of the design space (2d array).
        xi: Exploration-exploitation trade-off parameter.

    Returns:
        Location of the next sampling point.
    """
    dim = X_sample.shape[1]
    min_val = 1
    min_x = None

    def min_obj(X):
        # Minimization objective is the negative acquisition function
        return -acquisition(X.reshape(-1, dim), X_sample, Y_sample, fk, gk, xi)

    # Start from multiple points to avoid local minima
    for starting_point in np.random.uniform(bounds[:, 0], bounds[:, 1], size=(10, dim)):
        res = minimize(min_obj, starting_point.reshape(1, -1), bounds=bounds, method='L-BFGS-B')
        if res.fun < min_val:
            min_val = res.fun[0]
            min_x = res.x

    return min_x.reshape(-1, 1)

In [164]:
def optimize(f, g, bounds, input_dist, n_iter=25, initial_samples=5, xi=0.01):
    """
    Main optimization loop with constraints.

    Args:
        f: Objective function.
        g: Constraint function (should be <= 0 for feasible points).
        bounds: Bounds of the design space (2d array).
        n_iter: Number of iterations.
        initial_samples: Number of initial random samples.
        xi: Exploration-exploitation trade-off parameter.

    Returns:
        Optimal value and corresponding point.
    """
    # Randomly sample the initial design space
    X_sample = np.random.uniform(bounds[:, 0], bounds[:, 1], size=(initial_samples, len(bounds)))
    Y_sample = np.array([f(x) for x in X_sample])
    Yg_sample = np.array([g(x) for x in X_sample])

    # Initialize and fit Kriging models
    fk = SequentialPCKriging(input_dist, order=5)
    gk = SequentialPCKriging(input_dist, order=5)
    fk.fit(X_sample, Y_sample)
    gk.fit(X_sample, Yg_sample)

    for i in range(n_iter):
        fk.fit(X_sample, Y_sample)
        gk.fit(X_sample, Yg_sample)

        # Obtain the next sampling point and evaluate f and g
        X_next = propose_location_constrained(constrained_expected_improvement, X_sample, Y_sample, fk, gk, bounds, xi)
        Y_next = np.array([f(x) for x in X_next])
        Yg_next = np.array([g(x) for x in X_next])

        # Update samples
        X_sample = np.vstack((X_sample, X_next))
        Y_sample = np.append(Y_sample, Y_next)
        Yg_sample = np.append(Yg_sample, Yg_next)

    # Find the best feasible solution
    feasible_mask = Yg_sample <= 0
    if np.any(feasible_mask):
        feasible_Y_sample = Y_sample[feasible_mask]
        feasible_X_sample = X_sample[feasible_mask]
        opt_val = np.max(feasible_Y_sample)
        opt_point = feasible_X_sample[np.argmax(feasible_Y_sample)]
    else:
        opt_val = None
        opt_point = None

    return opt_val, opt_point


In [165]:
def f_test(x):
    x1, x2 = x
    return (4-2.1*x1**2 + 1/3*x1**4) * x1**2 + x1*x2 + (-4+4*x2**2)*x2**2

def g_test(x):
    x1, x2 = x
    return -1

# Test the optimization function
bounds = np.array([[-1, 1],
                   [-1, 1]])  # Bounds of the design space
opt_val, opt_point = optimize(f_test, g_test, bounds, cp.Uniform(-1,1), n_iter=3, initial_samples=5, xi=0.02)

print("Optimal Value:", opt_val)
print("Optimal Point:", opt_point)

ValueError: operands could not be broadcast together with shapes (5,) (5,2) 

In [167]:
X_sample = np.random.uniform(bounds[:, 0], bounds[:, 1], size=(5, len(bounds)))
Y_sample = np.array([f_test(x) for x in X_sample])
expansion = cp.generate_expansion(5, cp.Uniform(-1,1))
pce_model = cp.fit_regression(expansion, X_sample.T, Y_sample)

In [169]:
pce_model(X_sample.T)

array([[ 0.18021571,  1.12822619, -0.83127476, -0.74743496, -0.3725491 ],
       [-1.50357188, -0.28331356,  5.20889964, 26.73076189, 24.3253513 ]])

In [171]:
pce_model(X_sample)

array([[ 0.18021571, -1.50357188],
       [ 1.12822619, -0.28331356],
       [-0.83127476,  5.20889964],
       [-0.74743496, 26.73076189],
       [-0.3725491 , 24.3253513 ]])