## We start be loading some modules and helpers

In [None]:
import sys
sys.path.append('.')
import numpy as np
from scipy import linalg
from helpers import *

from smt.sampling_methods import LHS

import scipy.interpolate

from docplex.mp.model import Model

from qiskit_optimization.algorithms import CplexOptimizer,CobylaOptimizer
from qiskit_optimization.translators import from_docplex_mp

from qiskit.circuit.library import EfficientSU2
from qiskit import Aer
from qiskit.algorithms import VQE
from qiskit_optimization.algorithms import MinimumEigenOptimizer
from qiskit.algorithms.optimizers import COBYLA, L_BFGS_B, GSLS, GradientDescent

from qiskit.circuit.library import RealAmplitudes
from qiskit import QuantumCircuit, ClassicalRegister, QuantumRegister
from qiskit.circuit import ParameterVector
from scipy import optimize

from scipy.optimize import minimize
from smt.surrogate_models import KRG

try:
    import matplotlib.pyplot as plt
    plot_status = True
except:
    plot_status = False
from matplotlib import cm

##  Primary classes defined in the following cells are:
- BinaryProblemBody: represents a pure QUBO problem via (objective function includes constraints via penalty term as soft constraints whose continuous variables have to be fixed for evaluation on the quantum device)
- VQE_Optimizer: QUBO optimizer using VQE. The ansatz function can be choose by preference (we offer three different ansatz functions)
- MILPproblem: a class representing the entire MILP (or here a MBLP) problem including objective function, constraints and penalty
### Solvers
We implemented two different strategies to solve the problem. 
- MILPAlternatingsolver: solve the MILPproblem with alternating approach where a QUBO with fixed continuous variables and a continuous problem with fixed binary variables are optimized in an alternating fashion.
- MILPSurrogateSovler: solve the MILPproblem by building a purely continuous surrogate model and optimizing and refining the surrogate model.

In [None]:
# BinaryProblemBody requires two initial parameter: the objective function Expr (callable) and
# nBinary, the number of binary variables
# It uses the qiskit min_eigen_optimizer to build a QUBO problem
class BinaryProblemBody:
    def __init__(self,Expr,nBinary):
        self.md = Model("MixedBinaryContinous")
        self.x = []
        for i in range(nBinary):
            self.x.append(self.md.binary_var("x"+str(i)))
        self.initExpr = Expr
        self.opti = []
        self.optivalue = 1000

    def updateBodyExpr(self,EQConstrain):
        self.Expr = lambda u,x: self.initExpr(u,x) + sum([con(u,x) for con in EQConstrain])

    def cleanConstrain(self):
        self.md.clear_constraints()
        
    def UpdateExpr(self,u):
        # evaluate the expression with fixed continous value and
        # update the minimization problem for the binary model. 
        self.md.minimize(self.Expr(u,self.x))
        
    def EvaNewBinaryOpti(self,min_eigen_optimizer):        
        # Generate QUBO from Model md and solve it
        qp = from_docplex_mp(self.md)
        self.opti = min_eigen_optimizer.solve(qp).x
        self.optivalue = min_eigen_optimizer.solve(qp).fval

In [None]:
# VQE_Optimizer requires only number of quibits to instantiate the class.
class VQE_Optimizer:
    def __init__(self,nqubits,WaveFunType = "RealAmplitudes", backend=Aer.get_backend('aer_simulator_statevector'), 
                 optimizer=COBYLA()):
        self.backend = backend
        self.nqubits = nqubits
        self.optimizer = optimizer
        self.circuit_plot = None
        self.__generateOptimizer(WaveFunType)
    
    # Define an VQE optimizer, i.e. define an ansatz and an instance of qiskit's VQE algorithm
    def __generateOptimizer(self,WaveFunType):
        ## part 1: manually set the wave up
        if(WaveFunType=="manual"):
            params =ParameterVector('theta', length=self.nqubits*2)
            it = iter(params)
            wavefunction = QuantumCircuit(self.nqubits)
            for i in range(0, self.nqubits):
                wavefunction.ry(2 * next(it), i)

            #adding extra layers
            for i in range(0,self.nqubits-1):
                wavefunction.cz(0,1)

                for j in range(0, self.nqubits):
                    wavefunction.ry(2 * next(it), j)

        ## part 2:   EfficientSU2 ansatz
        elif(WaveFunType =="EfficientSU2"):
            wavefunction = EfficientSU2(num_qubits = self.nqubits, entanglement="pairwise")

        ## part 3:   RealAmplitudes ansatz
        elif(WaveFunType =="RealAmplitudes"):
            wavefunction = RealAmplitudes(num_qubits=self.nqubits, reps=3)
        else:
            print("the given ansatz type is currently not supported")
        print(wavefunction)
        self.circuit_plot = wavefunction.decompose().draw(output='mpl')
      
        #######using VQE######
        vqe = VQE(ansatz=wavefunction, optimizer=self.optimizer, quantum_instance=self.backend)
        self.min_eigen_optimizer = MinimumEigenOptimizer(vqe)
        ######################

In [None]:
# MILPproblem requires BinaryProblemBody, VQE_optimizer and Penalty value 
# as well as number of classical and binary variables and boundary for continuous variables

# The add_constrain method can add constrains from outside of the class and the BinaryProblemBody's 
# objective function via the updateConstrain method by adding a penalty term. 
# Currently, it supports for equality constrains only.

class MILPproblem:
    def __init__(self,body,VQE_Optimizer,Penalty, num_cl_vars, num_qbits, bounds):
        self.num_cl_vars = num_cl_vars
        self.num_qbits = num_qbits
        self.bounds = bounds
        self.body = body
        self.Optimizer = VQE_Optimizer
        self.Constrains = []
        self.Penalty = Penalty
        
    def CalcBinaryOptimum(self):
        self.body.EvaNewBinaryOpti(self.Optimizer.min_eigen_optimizer)
        
    def continousProblem(self,u,x):
        #Two parts are involved, but x(binary parts) will be treated as parameter(fixed) 
        return self.body.Expr(u,x)
        
    def updateBinary(self,u):
        self.body.UpdateExpr(u)
        
    def getBinaryOpti(self):
        return self.body.opti
    
    def getBinaryOptiValue(self):
        return self.body.optivalue
    
    def add_constrain(self,Expr,Op,target):
        self.Constrains.append(constrains(Expr,Op,target))
    
    def updateConstrain(self):
        self.EQConstrain = []
        for cons in self.Constrains:
            if(cons.op == "=="):
                #update the subject function(Expr) according to the equality constrain
                # Expr = Expr + penalty * (constrain-target)^2
                self.EQConstrain.append(lambda u,x: self.Penalty*pow((cons.Expr(u,x)-cons.Target),2))
        self.body.updateBodyExpr(self.EQConstrain)
        
    def removeConstrain(self):
        self.body.cleanConstrain()
        
    def testVadility(self,res):
        delta = 1e-04
        for cons in self.EQConstrain:
            if(cons(res[0],res[1])>delta):
                return False
        return True
    
    def getBinProbMinObjValue(self,u):
        self.body.UpdateExpr(u)
        self.CalcBinaryOptimum()
        return self.body.optivalue

In [None]:
# The MILPAlternatingsolver is initialized with a MILPproblem and solves it with an alternating strategy.
# The alternating strategy works as following: 
#    1. Give an initial guess of the continous variables
#    2. Update the continous part of the body expression of the MILPproblem, so that it will be a pure QUBO problem; solve the
#       QUBO problem via the VQE solver.
#    3. Update the body expression of the MILPproblem via the result of step 2. 
#       The problem becomes pure continous, it will be solved with an classical optimizer.
#    4. If the generated sequence starts 'converging' (i.e. update between old and new small), then exit, else go to step 2.
class MILPAlternatingsolver:
    def __init__(self, MILPbody):
        self.MILPbody = MILPbody
        
    def __distance(self,x,x1):
        x_ = np.array(x)
        x1_ = np.array(x1)
        return np.linalg.norm(x_-x1_,2)
    
    def solver(self,u_init, verbose=False):
        bounds = self.MILPbody.bounds
        solution_old = [0]*(self.MILPbody.num_cl_vars+self.MILPbody.num_qbits)
        step = 0
        self.MILPbody.updateBinary(u_init)
        # use alternating strategy until update between old/new value becomes small (or until at most 1e4 steps done)
        while step<1E4:
            step = step + 1
            #find optimum of binary problem
            self.MILPbody.CalcBinaryOptimum()
            x =self.MILPbody.getBinaryOpti()

            # Get constrains with fixed binary value
            constr = []
            for constraint in self.MILPbody.EQConstrain:
                constr.append({'type': 'eq', 'fun': lambda u: constraint(u,x)})
            # Optimize purely continuous problem
            res = optimize.shgo(self.MILPbody.body.initExpr, bounds, args = (x,), constraints=constr)
            u = res.x
            # update binary problem
            self.MILPbody.updateBinary(u)

            solution_new = list(x) + list(u)
            if verbose:
                displayDetail(step,u,x)
            if(self.__distance(solution_new,solution_old)<1E-8):
                break
            solution_old = solution_new
        return [u,x] 

In [None]:
class MILPSurrogateSolver:
    def __init__(self, MILPbody):
        self.MILPbody = MILPbody
        
    def __SBO(self,point):
        return self.t.predict_values(point)
    
    def __findCurrentOptimum(self,obj,npoints,bound):
        r_TR_or = [bound[i][1]-bound[i][0] for i in range(len(bound))]
        # generate initial points for optimization
        x_start = np.zeros([80,len(bound)])
        for i in range(len(bound)):
            x_start[:,i] = bound[i][0] + np.random.rand(80)*r_TR_or[i]
        #define the objective function:
        opt_all = np.array([minimize(lambda x: float(obj(x)), x_st, method='SLSQP', bounds=bound) for x_st in x_start])
        opt_success = opt_all[[opt_i['success'] for opt_i in opt_all]]
        obj_success = np.array([opt_i['fun'] for opt_i in opt_success])
        #return the optimum point
        ind_min = np.argmin(obj_success)
        opt = opt_success[ind_min]
        return opt['x']
    
    def solver(self,npoints,bound,maxIter):
        # generate initial sampling points and calculate their corresponding minmum value
        sampling = LHS(xlimits = bound, criterion='ese', random_state=1)
        # u_base and y_base are used as the data set for training the surrogate model
        self.u_base = sampling(npoints)
        self.y_base = []
        # y_old and x_old are used to record the best point of surrogate model for last step
        x_old  = np.array([-1000,-1000])
        y_old  = 1
        
        # evaluate all initial data points in base data set in order to build the surrogate model
        for u in self.u_base:
            self.y_base.append(self.MILPbody.getBinProbMinObjValue(u))
        # building surrogate using Kriging model
        # more information please refer:
        # https://www.sciencedirect.com/science/article/pii/S100093611930041X
        self.t = KRG(theta0=[1e-4]*len(bound),print_prediction = False)
        self.t.set_training_values(self.u_base,np.array(self.y_base))
        self.t.train()
        
        # this objective funtion stands for the finding the best point of the current surrogate model
        # which is a pure continuous problem. 
        obj_k = lambda x: self.__SBO(np.atleast_2d(x))
        
        for i in range(maxIter):# start to refine the surrogate model by adding maximal 'maxInter' points 
            #find the best point of the current surrogate model
            x_et_k = self.__findCurrentOptimum(obj_k,80,bound)
            
            #check if the result is converged, if yes, return the result.
            diff = x_et_k-x_old
            if(np.linalg.norm(diff)<1e-3):
                print("find optimal and break")
                self.MILPbody.updateBinary(x_et_k)
                self.MILPbody.CalcBinaryOptimum()
                x_binary =self.MILPbody.getBinaryOpti()
                return [list(x_et_k),list(x_binary)]
            
            #evaluate the new data
            #y_et_k is the true value calculated based on the current best point position.
            y_et_k = self.MILPbody.getBinProbMinObjValue(x_et_k)
            if(y_et_k<y_old):
                center = x_et_k
            else:
                center = x_old
            #converting nparray into list
            self.y_base.append(y_et_k)
            ulist = self.u_base.tolist()
            ulist.append(x_et_k.tolist())
            self.u_base = np.array(ulist)
            x_old = x_et_k
            y_old = y_et_k
            
            # adding curent best point to the base data site
            self.t.set_training_values(self.u_base,np.array(self.y_base))
            self.t.train()
        self.MILPbody.updateBinary(center)
        self.MILPbody.CalcBinaryOptimum()
        x_binary =self.MILPbody.getBinaryOpti()
        return [list(center),list(x_binary)]
    
    def visualization(self,bound):
        xlabel = np.linspace(bound[0][0], bound[0][1], 50)
        ylabel = np.linspace(bound[1][0], bound[1][1],50)
        resSM = []
        varSM = []
        x2D = []
        for xx0 in xlabel:
            for xx1 in ylabel:
                x2D.append(np.array([xx0,xx1]))
                resSM.append(self.t.predict_values(np.array([[xx0,xx1]])))
                varSM.append(self.t.predict_variances(np.array([[xx0,xx1]])))
        resSM = np.array(resSM)
        resSM = resSM.reshape((50,50)).T
        varSM = np.array(varSM)
        varSM = varSM.reshape((50,50)).T                     
        X,Y = np.meshgrid(xlabel,ylabel)

        fig = plt.figure(figsize=(15, 10))
        ax = fig.gca(projection='3d')
        ax.scatter(self.u_base[:,0], self.u_base[:,1], self.y_base, zdir='z', marker='x', c='b', s=200, label='DOE')
        surf = ax.plot_surface(X, Y, resSM, cmap=cm.coolwarm,
                               linewidth=0, antialiased=False,alpha=0.5)

        plt.legend()
        plt.show()

# Creating a MILP problem

In [None]:
# We define the following MILP (binary)
#     objektive function: x1*u1 - x1*x2 + x1*x2*u2 - u1+ 5*u2 + u1*u2 + u1*u1 + u2*u2
#     binary variables : x1,x2
#     continous variables: u1,u2
#     constraint (bounds) for continuous variables: -2<u1<2,-2<u2<3
#     and some additional constraints, see below.

# We have to specify the number of qubits (i.e. binary variables)
nQubit = 2
# and the number of continuous variables
num_classical_vars = 2

# the objective function
BodyExpr = lambda u,x:x[0]*u[0] - x[1]*u[1] + x[0]*x[1]\
                    - u[0]+ 5*u[1]+ u[0]*u[1]+u[0]*u[0]

# and we define the additional constraints
Constrain1 = lambda u,x: x[0]*u[0] - x[1]*u[1]
Constrain2 = lambda u,x: x[0]*u[0] + x[1]*u[1]
# those will be implemented as soft-constraints in the QUBO with penalty factor, i.e. something like
# ObjectiveFunction = lambda u,x:BodyExpr(u,x)+Penalty*pow(Constrain1(u,x),2)
Penalty = 30
# and bounds
bounds_classical_vars = np.array([(-2,2),(-2,3)])

# create binary optimization problem with the expression above
binaryP = BinaryProblemBody(BodyExpr,nQubit)

# then we set up the VQE algorithm with ansatz "RealAmplitudes" 
backend = Aer.get_backend('qasm_simulator') # We explicitly choose a backend; otherwise default aer_simulator_statevector will be used
optimizer = COBYLA() # default if not set; other options: L_BFGS_B, GSLS, GradientDescent
BinaryOptimizer = VQE_Optimizer(nQubit,"RealAmplitudes", backend, optimizer=optimizer)   

# and finally we create the MILP problem
MILP = MILPproblem(binaryP,BinaryOptimizer,Penalty, nQubit, num_classical_vars, bounds_classical_vars)

# and add one constraint MILPproblem in the sense Constrain1 = 2.0 (last argument specifies target value for the constraint)
MILP.add_constrain(Constrain1,"==",2.0)
# MILP.add_constrain(Constrain2,"==",2.0)

# after adding, the constrain should be updated. The 
MILP.updateConstrain()

# Using the alternating solver to find the solution

In [None]:
solver = MILPAlternatingsolver(MILP)

## The final optimum is initial condition depedent
An expected problem is that due to local minima or flat areas we might not find a global optimum. See the following example using onle one constraint and to different initial values for the continuous variables.

In [None]:
res1 = solver.solver([1,0], verbose=True)
pretty_print_optimizer(res1)
print(f"Optimum: {BodyExpr(res1[0],res1[1])}")

For another inital value we arrive at another point (and we reduce the solvers verbosity)

In [None]:
res2 = solver.solver([0,-2])
pretty_print_optimizer(res2)
print(f"Optimum: {BodyExpr(res2[0],res2[1])}")

## Use LHS to find the possibly best optimum
One way to deal with this problem is to use many different intial values for the continuous variables and take the minimum of the resulting local 'optima'. We use latin hypercube sampling (LHS) to define the initial values.

In [None]:
#using LHS to generate the initial points
#select the best result starting from different initial points
#it is not guaranteed to find the global optimum, but works well for a lot situation
node = 10
sampling = LHS(xlimits = bounds_classical_vars, criterion='ese', random_state=1)
Uinit = sampling(node)
optimum =float('Inf')
for uinit in Uinit:
    resi = solver.solver(uinit)
    #check if the solution is valid (constraints satisfied) and smaller than the old optimum
    if(MILP.testVadility(resi) and BodyExpr(resi[0],resi[1])<optimum):
        optimum = BodyExpr(resi[0],resi[1])
        optimumResult = resi

In [None]:
if optimum<float('Inf'):
    print(f"Optimum found is: {optimum} with")
    pretty_print_optimizer(optimumResult)
else:
    print("No feasible optimum found.")

# Using the surrogate model to solve the problem
As an alternative approach we can use the surrogate solver that builds and refines a continuous Kriging surrogate model by sampling the MILP problem, i.e. optimizing the binary problem for fixed continuous sampling points.
Then the surrogate model is minimized and the resulting point is added as sampling point if the update is larger than 1e-3.

In [None]:
SurrogateSolver = MILPSurrogateSolver(MILP)

In [None]:
# the solver of surrogate mode based method requires 3 input paramters:
# 1. Number of initial point of the surrogate model
# 2. Boundary of the continuous variable
# 3. Maximum iteration of refinement
center = SurrogateSolver.solver(40,bounds_classical_vars,20)

We can test if the solution is a feasible solution (since the surrogate model does not necessarily satisfies the constraints due to the formulation as soft constraints we should check):

In [None]:
MILP.testVadility(center)

In [None]:
print(f"Optimum found: {BodyExpr(center[0],center[1])}")

Let's visualize how the surrogate model looked like and how the refinement worked

In [None]:
SurrogateSolver.visualization(bounds_classical_vars)

## Let's add another constraint

In [None]:
# Recall that we considered the problem
#     objective function: x1*u1 - x1*x2 + x1*x2*u2 - u1+ 5*u2 + u1*u2 + u1*u1 + u2*u2
#     binary variables: x1,x2
#     continous variables: u1,u2
#     with constraints: -2<u1<2,
#                       -2<u2<3,
#                       x[0]*u[0] - x[1]*u[1] = 2.0

# We add the constraint 
# Constrain2: x[0]*u[0] + x[1]*u[1] = 2.0
MILP.add_constrain(Constrain2,"==",2.0)

# after adding, the constrain should be updated.
MILP.updateConstrain()

In [None]:
node = 10
sampling = LHS(xlimits = bounds_classical_vars, criterion='ese', random_state=1)
Uinit = sampling(node)
solver = MILPAlternatingsolver(MILP)
optimum =float("Inf")
for uinit in Uinit:
    resi = solver.solver(uinit)
    #check if the solution is valid (constraints satisfied) and smaller than the old optimum
    if(MILP.testVadility(resi) and BodyExpr(resi[0],resi[1])<optimum):
        optimum = BodyExpr(resi[0],resi[1])
        optimumResult = resi

In [None]:
if optimum<float('Inf'):
    print(f"Optimum found is: {optimum} with")
    pretty_print_optimizer(optimumResult)
else:
    print("No feasible optimum found.")

In [None]:
SurrogateSolver = MILPSurrogateSolver(MILP)
center = SurrogateSolver.solver(40,bounds_classical_vars,20)

In [None]:
pretty_print_optimizer(center)
print(BodyExpr(center[0],center[1]))
print(MILP.testVadility(center))

In the first example, alternating solver showed a good balance between accuracy and calculation cost. However, in the second 
example, it is failed to find a feasible point.

## Conclusions
Both algorithms delivered in the test cases feasible results. Since we needed less (expensive) QUBO evaluations for the alternating directions approach might be better suited for current small NISQ devices. However, in particular for larger problems and on future quantum devices, the surrogate model might also be interesting since it is not purely local as the alternating directions approach.

**Copyright 2023 Tobias Haas and Qifeng Pan, University of Stuttgart - HLRS**

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.