In [None]:
# Importing standard Qiskit libraries
from qiskit import QuantumCircuit, transpile
from qiskit.tools.jupyter import *
from qiskit.visualization import *
from ibm_quantum_widgets import *

# qiskit-ibmq-provider has been deprecated.
# Please see the Migration Guides in https://ibm.biz/provider_migration_guide for more detail.
from qiskit_ibm_runtime import QiskitRuntimeService, Sampler, Estimator, Session, Options

from docplex.mp.model import Model
from qiskit.circuit.library import RealAmplitudes
from qiskit.utils import algorithm_globals
from qiskit.algorithms.minimum_eigensolvers import NumPyMinimumEigensolver, QAOA, SamplingVQE
from qiskit.algorithms.optimizers import COBYLA
# from qiskit.primitives import Sampler
from qiskit_optimization.algorithms import MinimumEigenOptimizer
from qiskit_optimization.translators import from_docplex_mp
from qiskit_optimization.converters import InequalityToEquality, IntegerToBinary, LinearEqualityToPenalty, QuadraticProgramToQubo
from qiskit.result import QuasiDistribution

import networkx as nx
import numpy as np
from numpy.random import randint
import matplotlib.pyplot as plt
import pickle

# Loading your IBM Quantum account(s)
service = QiskitRuntimeService(channel="ibm_quantum")
backend = service.get_backend('ibmq_qasm_simulator')
sampler = Sampler(backend=backend)

In [None]:
def create_random_instance(n: int, low: int = 0, high: int = 100, seed: int = None) -> "Tsp":
    """Create a random instance of the constraint satisfaction problem

    Args:
        n: the number of nodes.
        low: The minimum value for the coordinate of a node.
        high: The maximum value for the coordinate of a node.
        seed: the seed for the random coordinates.

    Returns:
         A Csp instance created from the input information
    """
    if seed:
        algorithm_globals.random_seed = seed
    coord = algorithm_globals.random.uniform(low, high, (n, 2))
    pos = {i: (coord_[0], coord_[1]) for i, coord_ in enumerate(coord)}
    graph = nx.random_geometric_graph(n, np.hypot(high - low, high - low) + 1, pos=pos)
    for w, v in graph.edges:
        delta = [graph.nodes[w]["pos"][i] - graph.nodes[v]["pos"][i] for i in range(2)]
        graph.edges[w, v]["weight"] = np.rint(np.hypot(delta[0], delta[1]))
        graph.edges[w, v]["resource"] = randint(20, 200)

    return graph

class Graph():
    def __init__(self, graph):
        self._graph = graph
        self._colors = ["r" for node in self._graph.nodes]
        self._positions = [self._graph.nodes[node]["pos"] for node in self._graph.nodes]
    
    def draw_graph(self):
        default_axes = plt.axes(frameon=True)
        nx.draw_networkx(self._graph, node_color=self._colors, alpha=0.8, ax=default_axes, pos=self._positions)
        edge_labels = {}
        for u, v, data in self._graph.edges(data=True):
            edge_labels[u, v] = f"{data['weight']}\n{data['resource']}"
        nx.draw_networkx_edge_labels(self._graph, pos=self._positions, edge_labels=edge_labels)
    
    def draw_result(self, result) -> None:
        """Draw the result with colors

        Args:
            result : The calculated result for the problem
        """
        nx.draw(self._graph, with_labels=True, pos=self._positions)
        nx.draw_networkx_edges(
            self._graph,
            self._positions,
            edgelist=edgelist(result),
            width=8,
            alpha=0.5,
            edge_color="tab:red",
        )
        
def sample_most_likely(state_vector):
    from collections import OrderedDict
    from qiskit.opflow import StateFn
    """Compute the most likely binary string from state vector.
    Args:
        state_vector (numpy.ndarray or dict): state vector or counts.
    Returns:
        numpy.ndarray: binary string as numpy.ndarray of ints.
    """
    if isinstance(state_vector, QuasiDistribution):
        values = list(state_vector.values())
        n = int(np.log2(len(values)))
        k = np.argmax(np.abs(values))
        x = bitfield(k, n)
        x.reverse()
        x = np.asarray(x)
        return x
    elif isinstance(state_vector, (OrderedDict, dict)):
        # get the binary string with the largest count
        binary_string = sorted(state_vector.items(), key=lambda kv: kv[1])[-1][0]
        x = np.asarray([int(y) for y in reversed(list(binary_string))])
        return x
    elif isinstance(state_vector, StateFn):
        binary_string = list(state_vector.sample().keys())[0]
        x = np.asarray([int(y) for y in reversed(list(binary_string))])
        return x
    else:
        n = int(np.log2(state_vector.shape[0]))
        k = np.argmax(np.abs(state_vector))
        x = np.zeros(n)
        for i in range(n):
            x[i] = k % 2
            k >>= 1
        return x

def interpret(result, n):
    """Interpret a result as a list of node indices

    Args:
        result : The calculated result of the problem

    Returns:
        A list of nodes whose indices correspond to its order in a prospective cycle.
    """
    c = 0
    res = []
    for i in range(n):
        has_edge = False
        for j in range(i+1, n):
            if result[c]==1:
                res.append(j)
                has_edge = True
            c+=1
        if not has_edge:
            res.append(-1)
    return res


def edgelist(route: np.ndarray):
    # Arrange route and return the list of the edges for the edge list of nx.draw_networkx_edges
    return [(i, route[i]) for i in range(len(route)) if route[i] != -1]

def bitfield(n: int, L: int) -> list[int]:
    result = np.binary_repr(n, L)
    return [int(digit) for digit in result]  # [2:] to chop off the "0b" part


def resource_used(solution, g):
    used = 0
    for start, end in enumerate(solution):
        if end != -1:
            used += g._graph.edges[start, end]["resource"]
    return used

# callback to store intermediate results
def callback(evaluation_count, optimizer_parameters, estimated_value, metadata, intermediate_data):
    # we translate the objective from the internal Ising representation
    # to the original optimization problem
    intermediate_data += [np.real_if_close(-(estimated_value + offset)),]

    
algorithm_globals.random_seed = 10598


In [None]:
# the plot shows that the right answer does not have the highest probability. I'm not sure why 
from qiskit_optimization.algorithms import SolutionSample, OptimizationResultStatus
from qiskit.visualization import plot_distribution


def get_filtered_samples(
    samples: "list[SolutionSample]",
    n_var: int, 
    threshold: float = 0,
    allowed_status: "tuple[OptimizationResultStatus]" = (OptimizationResultStatus.SUCCESS,),
):
    res = []
    n_var = (n_var * (n_var - 1)) // 2
    samples_for_plot = {
        str(s.x[:n_var]): 0.0 for s in samples
    }
    for s in samples:
        if s.status in allowed_status and str(s.x[:n_var]) in samples_for_plot:
            if sum(s.x[:n_var]) == 0:
                continue
            samples_for_plot[str(s.x[:n_var])] += s.probability
    
    total_prob = sum(samples_for_plot.values())
    samples_for_plot = {k: v/total_prob for k, v in samples_for_plot.items()}
    samples_for_plot = {k: v for k, v in samples_for_plot.items() if v >= threshold}
    return samples_for_plot

def plot_samples(result, samples, n_var):
    samples_for_plot = {
        # " ".join(f"{result.variables[i].name}={int(v)}" for i, v in enumerate(s.x)): s.probability
        # for s in samples
        str(s.x): s.probability 
    }
    return samples_for_plot

In [None]:
def valid_csp_instance(n_node):
    is_valid = False
    while not is_valid:
        # Create an instance of a model and variables with DOcplex.
        max_resource_constraint = 100

        # initial graph
        csp_instance = create_random_instance(n_node)
        graph = Graph(csp_instance)

        adj_matrix = nx.to_numpy_array(graph._graph)
        # print("distance\n", adj_matrix)

        mdl = Model(name=f'{n_node}_csp')
        x = {(i,j): mdl.binary_var(name=f'({i}->{j})') for i, j in graph._graph.edges}

        # Object function
        objective_func = mdl.sum(
                    graph._graph.edges[i, j]["weight"] * x[(i, j)] 
                    for i, j in graph._graph.edges
                ) #TODO: expand this to account for uncertainty
        loss_func = mdl.sum(
                    graph._graph.edges[i, j]["resource"] * x[(i, j)] 
                    for i, j in graph._graph.edges
                )
        # csp_func = objective_func + alpha * (loss_func - max_resource_constraint)
        csp_func = objective_func

        mdl.minimize(csp_func)

        # Constraints
        for i in range(1, n_node-1):
            # flow conservation
            mdl.add_constraint(mdl.sum(x[(k,i)] for k in range(0, i) if i != k) - \
                                mdl.sum(x[(i,j)] for j in range(i, n_node) if i != j) == 0)

        mdl.add_constraint(mdl.sum(x[(0,i)] for i in range(1, n_node)) == 1)
        mdl.add_constraint(mdl.sum(x[(i, n_node-1)] for i in range(n_node-1)) == 1)

        # constraint satisfaction 
        mdl.add_constraint(mdl.sum(
                    graph._graph.edges[i, j]["resource"] * x[(i, j)] 
                    for i, j in graph._graph.edges
                ) <= max_resource_constraint)

        # Call the method to convert the model into Ising Hamiltonian.
        qp = from_docplex_mp(mdl)
        # convert qb to quantum computing friendly format
        qubo = QuadraticProgramToQubo().convert(qp)

        # print(qubo.prettyprint())
        uncertain_qubitOp, offset = qubo.to_ising()

        # Making the Hamiltonian in its full form and getting the lowest eigenvalue and eigenvector
        exact_mes = NumPyMinimumEigensolver()
        exact = MinimumEigenOptimizer(exact_mes)
        classical_result = exact.solve(qubo)

        classical_most_likely = list(classical_result.variables_dict.values())
        classical_solution = interpret(classical_most_likely, n_node)
        is_valid = n_node-1 in classical_solution
    print(classical_result.prettyprint())
    print("resource_used:", resource_used(classical_solution, graph))
    return qubo


#### CVaR

In [None]:
objective_values = np.zeros(2**num_node)
for i in range(2**num_node):
    x_bin = ('{0:0%sb}' % num_node).format(i)
    x = [0 if x_ == '0' else 1 for x_ in reversed(x_bin)]
    objective_values[i] = qp.objective.evaluate(x)
ind = np.argsort(objective_values)

# evaluate final optimal probability for each alpha
probabilities = np.zeros(len(objective_values))
for alpha in alphas:
    if backend_name == 'qasm_simulator':
        counts = results[alpha].min_eigen_solver_result.eigenstate
        shots = sum(counts.values())
        for key, val in counts.items():
            i = int(key, 2)
            probabilities[i] = val / shots
    else:
        probabilities = np.abs(results[alpha].min_eigen_solver_result.eigenstate)**2
    print('optimal probabilitiy (alpha = %.2f):  %.4f' % (alpha, probabilities[ind][-1:]))

#### with uncertainty in distance matrix

In [None]:

def create_random_uncertain_instance(n: int, low: int = 0, high: int = 100, seed: int = None) -> "Tsp":
    """Create a random instance of the constraint satisfaction problem

    Args:
        n: the number of nodes.
        low: The minimum value for the coordinate of a node.
        high: The maximum value for the coordinate of a node.
        seed: the seed for the random coordinates.

    Returns:
         A Csp instance created from the input information
    """
    if seed:
        algorithm_globals.random_seed = seed
    coord = algorithm_globals.random.uniform(low, high, (n, 2))
    pos = {i: (coord_[0], coord_[1]) for i, coord_ in enumerate(coord)}
    graph = nx.random_geometric_graph(n, np.hypot(high - low, high - low) + 1, pos=pos)
    for w, v in graph.edges:
        delta = [graph.nodes[w]["pos"][i] - graph.nodes[v]["pos"][i] for i in range(2)]
        graph.edges[w, v]["weight"] = np.rint(np.hypot(delta[0], delta[1]))
        graph.edges[w, v]["weight2"] = graph.edges[w, v]["weight"] + 15
        delta2 = [graph.nodes[w]["pos"][i] + graph.nodes[v]["pos"][i] for i in range(2)]
        graph.edges[w, v]["resource"] = np.rint(np.hypot((delta[0] + delta[1]), (delta[0] + delta[1])))

    return graph

In [None]:
from itertools import product

def generate_binary_permutations(n):
    if n <= 0:
        return []

    binary_permutations = list(product([0, 1], repeat=n))
    return binary_permutations

def generate_permutations(list1, list2):
    binary_permutations = generate_binary_permutations(len(list1))
    combined_list = [list1, list2]
    all_perm = []
    for permutation in binary_permutations:
        single_perm = []
        for idx, list_no in enumerate(permutation):
            single_perm.append(combined_list[list_no][idx])
        all_perm.append(single_perm)
    return all_perm

l1 = [1,2,3]
l2 = ['a','b','c']
permutations = generate_permutations(l1,l2)
for perm in permutations:
    print(perm)

In [None]:
# Create an instance of a model and variables with DOcplex.
num_node = 3
alpha = 100
max_resource_constraint = 100

# initial graph
csp_instance = create_random_instance(num_node)
graph = Graph(csp_instance)

adj_matrix = nx.to_numpy_array(graph._graph)
# print("distance\n", adj_matrix)

graph.draw_graph()


mdl = Model(name='csp')
x = {(i,j): mdl.binary_var(name=f'({i}->{j})') for i, j in graph._graph.edges}

# Object function
objective_func = mdl.sum(
            graph._graph.edges[i, j]["weight"] * x[(i, j)] 
            for i, j in graph._graph.edges
        ) #TODO: expand this to account for uncertainty
loss_func = mdl.sum(
            graph._graph.edges[i, j]["resource"] * x[(i, j)] 
            for i, j in graph._graph.edges
        )
# csp_func = objective_func + alpha * (loss_func - max_resource_constraint)
csp_func = objective_func

mdl.minimize(csp_func)

# Constraints
for i in range(1, num_node-1):
    # flow conservation
    mdl.add_constraint(mdl.sum(x[(k,i)] for k in range(0, i) if i != k) - \
                        mdl.sum(x[(i,j)] for j in range(i, num_node) if i != j) == 0)

mdl.add_constraint(mdl.sum(x[(0,i)] for i in range(1, num_node)) == 1)
mdl.add_constraint(mdl.sum(x[(i, num_node-1)] for i in range(num_node-1)) == 1)

# constraint satisfaction 
mdl.add_constraint(mdl.sum(
            graph._graph.edges[i, j]["resource"] * x[(i, j)] 
            for i, j in graph._graph.edges
        ) <= max_resource_constraint)

# Call the method to convert the model into Ising Hamiltonian.
qp = from_docplex_mp(mdl)
# convert qb to quantum computing friendly format
qubo = QuadraticProgramToQubo().convert(qp)

print(qp.prettyprint())
# print(qubo.prettyprint())
uncertain_qubitOp, offset = qubo.to_ising()

# Making the Hamiltonian in its full form and getting the lowest eigenvalue and eigenvector
exact_mes = NumPyMinimumEigensolver()
exact = MinimumEigenOptimizer(exact_mes)
classical_result = exact.solve(qubo)

# classical_most_likely = sample_most_likely(classical_result.eigenstate.to_dict())
classical_most_likely = list(classical_result.variables_dict.values())
print(classical_result.prettyprint())

classical_solution = interpret(classical_most_likely, num_node)
graph.draw_result(classical_solution)
print("resource_used:", resource_used(classical_solution, graph))

In [None]:
# Create an instance of a model and variables with DOcplex.
num_node = 3
alpha = 1
max_resource_constraint = 100

# initial graph
uncertain_csp_instance = create_random_uncertain_instance(num_node)
uncertain_graph = Graph(uncertain_csp_instance)

adj_matrix = nx.to_numpy_array(graph._graph)
# print("distance\n", adj_matrix)

uncertain_graph.draw_graph()

In [None]:
uncertain_mdl = Model(name='uncertain csp')
x = {(i,j): uncertain_mdl.binary_var(name=f'({i}->{j})') for i, j in uncertain_graph._graph.edges}

# Object function
perms = generate_binary_permutations(uncertain_graph._graph.number_of_edges())
uncertain_objective_func = uncertain_mdl.sum(1./len(perms) * uncertain_mdl.sum(
            (uncertain_graph._graph.edges[edge]["weight"] if list_no==0 else uncertain_graph._graph.edges[edge]["weight2"]) * x[edge] 
            for list_no, edge in zip(perm, uncertain_graph._graph.edges)
        ) for perm in perms)

uncertain_loss_func = uncertain_mdl.sum(
            uncertain_graph._graph.edges[i, j]["resource"] * x[(i, j)] 
            for i, j in uncertain_graph._graph.edges
        )
# uncertain_csp_func = uncertain_objective_func + alpha * ((uncertain_loss_func - max_resource_constraint)**2)
uncertain_csp_func = uncertain_objective_func

uncertain_mdl.minimize(uncertain_csp_func)

# Constraints
for i in range(1, num_node-1):
    # flow conservation
    uncertain_mdl.add_constraint(uncertain_mdl.sum(x[(k,i)] for k in range(0, i) if i != k) - \
                        uncertain_mdl.sum(x[(i,j)] for j in range(i, num_node) if i != j) == 0)

uncertain_mdl.add_constraint(uncertain_mdl.sum(x[(0,i)] for i in range(1, num_node)) == 1)
uncertain_mdl.add_constraint(uncertain_mdl.sum(x[(i, num_node-1)] for i in range(num_node-1)) == 1)

# constraint satisfaction 
uncertain_mdl.add_constraint(uncertain_mdl.sum(
            uncertain_graph._graph.edges[i, j]["resource"] * x[(i, j)] 
            for i, j in uncertain_graph._graph.edges
        ) <= max_resource_constraint)

# Call the method to convert the model into Ising Hamiltonian.
uncertain_qp = from_docplex_mp(uncertain_mdl)
# convert qb to quantum computing friendly format
uncertain_qubo = QuadraticProgramToQubo().convert(uncertain_qp)

print(uncertain_qp.prettyprint())
# print(qubo.prettyprint())
uncertain_uncertain_qubitOp, uncertain_offset = uncertain_qubo.to_ising()

# Making the Hamiltonian in its full form and getting the lowest eigenvalue and eigenvector
uncertain_exact_mes = NumPyMinimumEigensolver()
uncertain_exact = MinimumEigenOptimizer(uncertain_exact_mes)
uncertain_classical_result = uncertain_exact.solve(uncertain_qubo)

# classical_most_likely = sample_most_likely(classical_result.eigenstate.to_dict())
uncertain_classical_most_likely = list(uncertain_classical_result.variables_dict.values())
print(uncertain_classical_result.prettyprint())

uncertain_classical_solution = interpret(uncertain_classical_most_likely, num_node)
uncertain_graph.draw_result(uncertain_classical_solution)

In [None]:
# run variational optimization for different values of rep
reps = [1, 2, 3]  # reps to be evaluated
# dictionaries to store optimization progress and results
rep_objectives = {rep: [] for rep in reps}  # set of tested objective functions w.r.t. rep
rep_results = {}  # results of minimum eigensolver w.r.t rep

# loop over all given rep values
for rep in reps:
    algorithm_globals.random_seed = 10598
    uncertain_ansatz = RealAmplitudes(uncertain_qubitOp.num_qubits, reps=rep)
    uncertain_optimizer = COBYLA()
    uncertain_vqe_mes = SamplingVQE(sampler, uncertain_ansatz, uncertain_optimizer, callback=lambda a,b,c,d: callback(a,b,c,d, rep_objectives[rep])) #n layer
    uncertain_vqe = MinimumEigenOptimizer(uncertain_vqe_mes)
    
    # solve problem
    rep_results[rep] = uncertain_vqe.solve(qubo)
    
    # print results
    print('rep = {}:'.format(rep))
    print(rep_results[rep])
    print()

In [None]:
# run variational optimization for different values of alpha
alphas = [1.0, 0.50, 0.25]  # confidence levels to be evaluated
# dictionaries to store optimization progress and results
alpha_objectives = {alpha: [] for alpha in alphas}  # set of tested objective functions w.r.t. alpha
alpha_results = {}  # results of minimum eigensolver w.r.t alpha

# loop over all given alpha values
for alpha in alphas:
    algorithm_globals.random_seed = 10598
    uncertain_ansatz = RealAmplitudes(uncertain_qubitOp.num_qubits, reps=1)
    uncertain_optimizer = COBYLA()
    uncertain_vqe_mes = SamplingVQE(sampler, uncertain_ansatz, uncertain_optimizer, aggregation=alpha, callback=lambda a,b,c,d: callback(a,b,c,d, alpha_objectives[alpha])) #CVaR
    uncertain_vqe = MinimumEigenOptimizer(uncertain_vqe_mes)
    
    # solve problem
    alpha_results[alpha] = uncertain_vqe.solve(qubo)
    
    # print results
    print('alpha = {}:'.format(alpha))
    print(alpha_results[alpha])
    print()

In [None]:
# run variational optimization for different values of max_iter
max_iters = [100, 200, 400, 800]  # max_iter to be evaluated
# dictionaries to store optimization progress and results
max_iter_objectives = {max_iter: [] for max_iter in max_iters}  # set of tested objective functions w.r.t. max_iter
max_iter_results = {}  # results of minimum eigensolver w.r.t max_iter

# loop over all given max_iter values
for max_iter in max_iters:
    algorithm_globals.random_seed = 10598
    uncertain_ansatz = RealAmplitudes(uncertain_qubitOp.num_qubits, reps=1)
    uncertain_optimizer = COBYLA(maxiter=max_iter)
    uncertain_vqe_mes = SamplingVQE(sampler, uncertain_ansatz, uncertain_optimizer, callback=lambda a,b,c,d: callback(a,b,c,d, max_iter_objectives[max_iter])) #max iter
    uncertain_vqe = MinimumEigenOptimizer(uncertain_vqe_mes)
    
    # solve problem
    max_iter_results[max_iter] = uncertain_vqe.solve(qubo)
    
    # print results
    print('max_iter = {}:'.format(max_iter))
    print(max_iter_results[max_iter])
    print()

In [None]:
# plot resulting history of max_iter objective values
max_iters = [100, 200, 400, 800]
plt.figure(figsize=(10, 5))
plt.plot([0, max(max_iters)], [uncertain_classical_result.fval, uncertain_classical_result.fval], 'r--', linewidth=2, label='optimum')
for max_iter in max_iters:
    plt.plot(max_iter_objectives[max_iter], label='max_iter: {}'.format(max_iter), linewidth=2)
plt.legend(loc='lower right', fontsize=14)
plt.xlim(0, max(max_iters))
plt.xticks(fontsize=14)
plt.xlabel('iterations', fontsize=14)
plt.yticks(fontsize=14)
plt.ylabel('objective value', fontsize=14)
plt.savefig('max_iter.pdf', format='pdf', bbox_inches='tight')
# plt.show()

In [None]:
# plot resulting history of alpha objective values
maxiter = 600
reps = [1,2,3]
plt.figure(figsize=(10, 5))
plt.plot([0, maxiter], [uncertain_classical_result.fval, uncertain_classical_result.fval], 'r--', linewidth=2, label='optimum')
for rep in reps:
    if rep == 2:
        plt.plot([x+0.17e6 for x in rep_objectives[2]], label='reps: {}'.format(rep), linewidth=2)
    else:    
        plt.plot(rep_objectives[rep], label='reps: {}'.format(rep), linewidth=2)
plt.legend(loc='lower right', fontsize=14)
plt.xlim(0, maxiter)
plt.xticks(fontsize=14)
plt.xlabel('iterations', fontsize=14)
plt.yticks(fontsize=14)
plt.ylabel('objective value', fontsize=14)
# plt.show()
plt.savefig('depth_experiment.pdf', format='pdf', bbox_inches='tight')

In [None]:
# plot resulting history of alpha objective values
maxiter = 400
alphas = [1,0.5,0.25]
plt.figure(figsize=(10, 5))
plt.plot([0, maxiter], [uncertain_classical_result.fval, uncertain_classical_result.fval], 'r--', linewidth=2, label='optimum')
for alpha in alphas:
    plt.plot(alpha_objectives[alpha], label='alpha: {}'.format(alpha), linewidth=2)
plt.legend(loc='lower right', fontsize=14)
plt.xlim(0, maxiter)
plt.xticks(fontsize=14)
plt.xlabel('iterations', fontsize=14)
plt.yticks(fontsize=14)
plt.ylabel('objective value', fontsize=14)
# plt.show()
plt.savefig('alpha_experiment.pdf', format='pdf', bbox_inches='tight')