<p style="text-align: right">
Lennart Binkowski<br>
Andreea-Iulia Lefterovici<br>
Tobias J. Osborne<br>
René Schwonnek<br>
Sören Wilkening<br>
Timo Ziegler<br>
</p>

<div style="text-align: justify; font-size: x-large">
    <b>Quantum-powered logistics: Towards an Efficient and Sustainable Supply Chain</b>
</div>

<div style='text-align: justify; font-size: large'>
A quantum-ready Qiskit implementation of Quantum Tree Generator and Quantum Conic Programming
</div>

In this Jupyter notebook, we present a comprehensive Logistics Optimization Problem (LOP) class, an implementation of the Classical Tree Generator (CTG), as well as Qiskit-implementations of the Quantum Tree Generator (QTG) and the Quantum Conic Programming (QCP) as detailed in the main document.

A Python version of at least 3.9 is required to execute the content of this notebook.

In [1]:
import sys
required_version = (3, 9)
if sys.version_info < required_version:
    raise ValueError(f"This notebook requires Python {required_version[0]}.{required_version[1]} or later.")

Our notebook uses mostly standard python packages. However, as the task of implementing several classical-quantum algorithms for a complex problem class is anything but standard, we need to import several nonstandard libraries:
- `gurobipy` for obtaining a good initial state for the QTG
- `matplotlib` for graphical representation of circuits and results
- `networkx` for a graphical representation of the PBS
- `numpy` for all sorts of vector manipulation
- `picos` for solving the arising semi-definite programs
- `qiskit` for writing the quantum routines
- `qiskit_aer` for a noiseless simulator
- `qiskit_ibm_runtime` for a fake quantum backend, including noise simulation

In [2]:
# standard libraries
from collections import Counter
from copy import deepcopy
from csv import reader, writer
from itertools import product
from math import acos, sqrt
from operator import countOf
from queue import Queue
from random import choice, choices, seed, randint, randrange, uniform
from typing import Union

# non-standard libraries
import gurobipy as gp
from matplotlib.pyplot import show
from networkx import draw, Graph
import numpy as np
import picos
from qiskit import transpile, QuantumCircuit, QuantumRegister, ClassicalRegister
from qiskit.circuit import Instruction, CircuitInstruction
from qiskit.circuit.library import IGate, SGate, RYGate, XGate, StatePreparation
from qiskit_aer import AerSimulator
from qiskit_ibm_runtime.fake_provider import FakeSherbrooke
from scipy.linalg import eigh

In [3]:
class LOP:
    """
    A class used to represent a LOP.

    ...

    Attributes
    ----------
    num_parts: int
        Number of parts in the PBS
    num_sites: int
        Number of sites
    coefficients: np.ndarray
        Array of transportation cost matrices for each product
    phi: dict[int, int]
        Dictionary of constituents and their resultants from the PBS
    reduced_phi: dict[int, int]
        Dictionary of constituents and their resultants leaving out tuples involving assigned parts
    parts: dict[int, int]
        Dictionary of parts assigned to their layer in the PBS
    layers: int
        Number of layers in the PBS
    psi: set[tuple[int, int]]
        Pairs of parts constituting to the same resultants
    reduced_psi: set[tuple[int, int]]
        Pairs of parts constituting to the same resultants without assigned parts
    free_parts: set[int]
        Parts not assigned yet
    available_sites: dict[int, set[int]]
        Dictionary assigning each product a set of available sites
    max_part: int
        Maximal index of an unassigned part
    num_var: int
        Number of variables used to represent the PBS
    assigned_costs: float
        Costs arising from pre-assignment

    Methods
    -------
    generate_coefficients(num_products: int, num_sites: int, save_to_file: bool = False) -> np.ndarray
        Returns a 3D array of randomly drawn coefficients corresponding to the transportation costs of each part from
        one site to another.
    generate_pbs(num_layers: int, num_parts: int, save_to_file: bool = False) -> dict[int, int]
        Returns a random PBS based on a given number of layers and a number of parts; returns a set of tuples
        resembling all resultant/constituent connections.
    from_parameters(num_parts: int, num_sites: int, num_layers: int, save_to_file: bool = False) -> LOP:
        Returns an LOP by randomly generating transportation cost matrices as well as the PBS.
    draw_pbs()
        Draws the PBS using the networkx drawing library.
    assign_parts(pre_assignments: dict[int, int])
        Assigns parts to sites and updates the PBS structure as well as the variables according to the constraints.
    assign_parts_inplace(pre_assignments: dict[int, int])
        Assigns parts to sites that were not assigned a pre-assignment.
    recall_parts_inplace(parts: set[int])
        Recall parts after they have been preassigned.
    var_index(part: int, site: int) -> int
        Returns the index of the variable corresponding to the given part and the given site.
    ctg(shots: int, initial_assignment: Union[int, str, dict[int, int]] = None, bias: float = 0.5) -> tuple[dict[int, int], float]
        Returns the best assignment and its costs found after applying the CTG several times.
    qtg() -> tuple[QuantumCircuit, int]
        Returns the quantum circuit of the initial state based on classical optimization and the number of ancilla
        qubits used in it.
    assignment_from_bitstring(bitstring: Union[int, str]) -> dict[int, int]
        Returns a dictionary of parts and assigned site based on a bitstring.
    is_feasible(bitstring: Union[int, str, dict[int, int]) -> bool
        Returns a boolean indicating whether the given assignment is a feasible solution.
    get_objective_value(assignment: Union[int, str, dict[int, int]]) -> float:
        Returns the objective value on a given assignment completed by any pre-assignment.
    get_objective_value_inplace() -> float
        Returns the objective value based on the pre-assignment. All unassigned parts are assigned the first site in
        their available sites.
    get_optimal_value() -> float:
        Returns the optimal value amongst all possible assignments.
    """

    @staticmethod
    def generate_coefficients(num_parts: int, num_sites: int, save_to_file: bool = False) -> np.ndarray:
        """
        Returns a 3D array of randomly drawn coefficients corresponding to the transportation costs of each part from
        one site to another.

        Parameters
        ----------
        num_parts: int
            Number of parts in the PBS
        num_sites: int
            Number of sites
        save_to_file: bool
            Whether to save the coefficient array to a .csv file
        """
        # Initialize a numpy array of num_products-1 2D numpy arrays, each corresponding to the transportation costs of
        # one part (the final product need not be shipped), to all zeros
        data: np.ndarray = np.zeros(shape=(num_parts - 1, num_sites, num_sites), dtype=np.float64)
        f1: np.ndarray = np.zeros(shape=num_parts, dtype=np.float64)
        f2: np.ndarray = np.zeros(shape=(num_sites, num_sites), dtype=np.float64)
        seed(a=1)

        # Generate a random coefficient for each part's individual transportation expense between 0.05 and 1
        for p in range(num_parts - 1):
            f1[p] = randrange(50, 1000) / 1000

        # Generate a random matrix with values between 0.1 and 1, displaying the transportation expense from one site to
        # another
        for s1 in range(num_sites - 1):
            for s2 in range(s1 + 1, num_sites):
                f2[s1, s2] = randrange(100, 1000) / 100

        # Fill entries of the p-th transportation cost matrix with the product of the p-th individual product expense
        # and the randomly generated transportation cost matrix rounded to two decimal places
        for p in range(num_parts - 1):
            for s1 in range(num_sites - 1):
                for s2 in range(s1 + 1, num_sites):
                    data[p, s1, s2] = round(f1[p] * f2[s1][s2], 2)

        # If save_to_file is True the transportation cost matrix is saved to a .csv file as a table ordered by the
        # part's index
        if save_to_file:
            header: list[str] = ['Part', 'Departure/Arrival site', 'Arrival/Departure site', 'Cost']

            with open('cost_' + str(num_parts).zfill(2) + '_' + str(num_sites).zfill(2) + '.csv', 'w') as f:
                file_writer: writer = writer(f)
                file_writer.writerow(header)
                for p in range(num_parts - 1):
                    for s1 in range(num_sites - 1):
                        for s2 in range(s1 + 1, num_sites):
                            file_writer.writerow([p + 2, s1 + 1, s2 + 1, data[p, s1, s2]])

            f.close()

        return data

    @staticmethod
    def generate_pbs(num_layers: int, num_parts: int, save_to_file: bool = False) -> dict[int, int]:
        """
        Returns a random PBS based on a given number of layers and a number of parts; returns a set of tuples
        resembling all resultant/constituent connections.

        num_layers: int
            The number of layers in the PBS
        num_parts: int
            The number of parts in the PBS
        save_to_file: bool
            Whether to save the PBS to a .csv file
        """
        while True:
            # Create a list of layers with the final product assigned to the first layer
            layers: list[list[int]] = [[] for _ in range(num_layers)]
            layers[0].append(1)

            # The remaining parts are distributed to layers of random sizes
            current_level: int = 1
            remaining_parts: list[int] = list(range(2, num_parts + 1))

            while remaining_parts:
                # If the current layer is the second only one part is assigned, otherwise some random amount between one
                # and the number of remaining parts or the number of layers, depending on which is smaller
                if current_level == 1:
                    num_assigned: int = 1
                else:
                    num_assigned: int = randint(1, min(len(remaining_parts), num_layers))

                # Add parts up to and excluding the num_assigned entry of remaining parts to the current layer and
                # remove them from the remaining parts
                layers[current_level].extend(remaining_parts[:num_assigned])
                remaining_parts = remaining_parts[num_assigned:]

                # Randomly choose a layer other than the top layer to assign parts to in the next iteration
                current_level = randint(1, num_layers - 1)

            # Check if each layer has at least one product
            if all(layer for layer in layers):
                break

        # Relabel elements across layers sequentially
        sequential_labels = 1
        for layer in range(num_layers):
            layer_size = len(layers[layer])
            layers[layer] = list(range(sequential_labels, sequential_labels + layer_size))
            sequential_labels += layer_size

        phi: dict[int, int] = {}

        # Iterate through all pairs of consecutive layers
        for layer in range(1, len(layers)):
            resultant_layer: list[int] = layers[layer - 1]
            constituent_layer: list[int] = layers[layer]

            # Initialize the recent resultant to the minimum of resultant_layer
            recent_resultant: int = min(resultant_layer)

            # Iterate through each constituent in the constituent layer
            for constituent in constituent_layer:
                # Randomly select an element from the current layer (greater than or equal to recent_element)
                eligible_resultants: list[int] = [p for p in resultant_layer if p >= recent_resultant]

                # Check if there are eligible elements in the current layer and update the most recently selected
                # element
                if eligible_resultants:
                    current_element: int = choice(eligible_resultants)
                    recent_resultant = current_element
                else:
                    current_element = choice(resultant_layer)

                # Add the pair to the list (or set phi in our case )
                phi[constituent] = current_element

        # If save_to_file is True the PBS is saved to a .csv file as a table of constituent and resultant
        if save_to_file:
            header = ['Lower level constituent part', 'Higher level resultant part']

            with open('pbs_' + str(num_parts).zfill(2) + '.csv', 'w') as f:
                file_writer = writer(f)
                file_writer.writerow(header)
                for constituent, resultant in phi.items():
                    file_writer.writerow([constituent, resultant])

            f.close()
        # Return the pairs (the set phi)
        return phi

    def __init__(self, coefficients: Union[str, np.ndarray], pbs: Union[str, dict[int, int]]):
        """
        Parameters
        ----------
        coefficients: Union[str, np.ndarray]
            A .csv file's name as a string or an array containing the transportation cost matrices for each product
        pbs: Union[str, dict[int, int]]
            A .csv file's name as a string or a dictionary containing all resultant/constituent connections of the PBS
        """
        # If input coefficients is a string read the transportation cost matrices from the corresponding .csv file
        if isinstance(coefficients, str):
            with open(coefficients, newline='') as coefficients_file:
                coefficients_content = list(reader(coefficients_file, delimiter=',', quotechar='|'))

            m_str, _, n_str, _ = next(row for row in reversed(coefficients_content) if len(row) != 0)
            self.num_parts: int = int(m_str)
            self.num_sites: int = int(n_str)

            # Construct an array of triangular matrices
            self.coefficients: np.ndarray = np.zeros((self.num_parts - 1, self.num_sites, self.num_sites), dtype=np.float64)

            for row in coefficients_content[1::]:
                if len(row) != 0:
                    self.coefficients[int(row[0]) - 2, int(row[1]) - 1, int(row[2]) - 1] = float(row[3])

        # Otherwise, initialize coefficients to the numpy array
        elif isinstance(coefficients, np.ndarray):
            self.num_parts: int = coefficients.shape[0] + 1
            self.num_sites: int = coefficients.shape[1]
            self.coefficients: np.ndarray = coefficients

        else:
            raise TypeError('Coefficients must be a filepath (String) or a numpy array')

        # Fill the transportation matrices' lower triangles by adding the transposed
        self.coefficients += self.coefficients.transpose(0, 2, 1)

        # If pbs is a str read phi corresponding to resultant/constituent connections from the corresponding .csv file
        if isinstance(pbs, str):
            self.phi: dict[int, int] = {}
            with open(pbs, newline='') as pbs_file:
                pbs_content: reader = reader(pbs_file, delimiter=',', quotechar='|')
                next(pbs_file)
                for row in pbs_content:
                    if len(row) != 0:
                        self.phi[int(row[0])] = int(row[1])

        # Otherwise, initialize phi to a deep copy of the input set
        elif isinstance(pbs, dict):
            self.phi: dict[int, int] = deepcopy(pbs)

        else:
            raise TypeError('PBS must be a filepath (String) or a dictionary of int keys and int values')

        # Initialize a set that contains remaining constituent/resultant connections after parts were pre-assigned
        self.reduced_phi: dict[int, int] = deepcopy(self.phi)

        # Create a dictionary assigning each product to its layer in the PBS
        self.parts: dict[int, int] = {}

        # Identify the products that are highest in the product chain
        resultants: set[int] = {q for q in self.phi.values() if q not in {p for p in self.phi.keys()}}
        layer: int = 1

        # Starting at the first layer assign each resultant to the current layer
        while resultants:
            constituents: set[int] = set()
            for q in resultants:
                self.parts[q] = layer
                # Update the resultants to be all parts sharing a tuple with one of the recent resultants
                constituents |= {p for (p, r) in self.phi.items() if r == q}
            resultants = constituents
            # Increase the layer index by one
            layer += 1
        self.layers: int = layer - 1

        # Construct the set psi of tuple of parts constituting to the same resultants
        self.psi: set[tuple[int, int]] = set()
        for r in self.parts.keys():
            count: int = 0
            buffer: set[int] = set()
            # Add all constituents of the current resultant to buffer
            for p, q in self.phi.items():
                if q == r:
                    count += 1
                    buffer.add(p)
            # If there are more than two constituents in buffer add each valid tuple to psi
            if count >= 2:
                [self.psi.add((p1, p2)) for p1 in buffer for p2 in buffer if p1 < p2]

        # Initialize a set that contains the remaining tuple of parts constituting to the same resultant after
        # pre-assignment
        self.reduced_psi: set[tuple[int, int]] = deepcopy(self.psi)

        # Initialize the set of parts not assigned yet
        self.free_parts: set[int] = {p for p in self.parts.keys()}

        # Initialize a dictionary assigning all sites to each part
        self.available_sites: dict[int, Union[int, set[int]]] = {p: set(range(self.num_sites)) for p in self.parts.keys()}

        # Initialize the maximal index of parts not assigned yet
        self.max_part: int = max(self.free_parts)

        # Initialize the total number of variables to the sum of available sites of each product
        self.num_var: int = self.var_index(self.max_part, max(self.available_sites[self.max_part])) + 1

        # Initialize the costs arising from pre-assignment
        self.assigned_costs: float = 0

    @classmethod
    def from_parameters(cls, num_parts: int, num_sites: int, num_layers: int, save_to_file: bool = False) -> 'LOP':
        """
        Returns an LOP by randomly generating transportation cost matrices as well as the PBS.

        num_parts: int
            Number of parts
        num_sites: int
            Number of sites
        num_layers: int
            Number of layers in the PBS
        save_to_file: bool
            Whether to save the LOP's underlying structures to .csv files
        """
        return cls(coefficients=LOP.generate_coefficients(num_parts, num_sites, save_to_file),
                   pbs=LOP.generate_pbs(num_layers, num_parts, save_to_file))

    def draw_pbs(self):
        """
        Draws the PBS using the networkx drawing library.
        """

        # Initialize an empty networkx graph
        pbs_graph: Graph = Graph()
        # Capture the number of items in each layer of the PBS in a list counting the layer appearance in the parts dict
        layer_sizes: list[int] = [0 for _ in range(self.layers)]
        for layer in range(self.layers):
            layer_sizes[layer] = countOf(self.parts.values(), (layer + 1))

        # Add a node to the graph for each part indexing it according to the number of nodes in previous layers and its
        # position in the current layer starting at node index one
        for i in range(self.layers):
            for j in range(layer_sizes[i]):
                node_index: int = sum(layer_sizes[:i]) + j + 1
                pbs_graph.add_node(node_index, layer=i + 1)

        # Add edges to the graph based on pairs of constituent and resultant provided by phi
        for (p, q) in self.phi.items():
            pbs_graph.add_edge(p, q)

        # Draw the graph by positioning each node according to its position of the layer (y) and within the layer (x)
        pos: dict[int, tuple[int, int]] = {}
        max_layer_size: int = max(layer_sizes)
        for i in range(self.layers):
            for j in range(layer_sizes[i]):
                node_index: int = sum(layer_sizes[:i]) + j + 1
                if i == 0 and j == 0:  # For the first node in the first layer
                    x: int = (max_layer_size - 1) // 2
                else:
                    x: int = j * (max_layer_size - 1) // (layer_sizes[i] - 1)
                y: int = -i
                pos[node_index] = (x, y)
        draw(G=pbs_graph, pos=pos, with_labels=True, node_size=500, node_color='skyblue', font_size=10, font_weight='bold', edgecolors='black')
        show()

    def assign_parts(self, pre_assignments: dict[int, int]):
        """
        Assigns parts to sites and updates the PBS structure as well as the variables according to the constraints.

        pre_assignments: dict[int, int]
            Dictionary of parts with site to be assigned to
        """

        assigned_parts: set[int] = set(pre_assignments.keys())

        # Update free_parts to exclude the assigned parts
        self.free_parts = self.parts.keys() - assigned_parts

        # Update available_sites such that each assigned product's value narrows down to its assigned site
        self.available_sites = {p: set(range(self.num_sites)) for p in self.free_parts}
        for (p, s) in pre_assignments.items():
            self.available_sites[p] = s

        # For each assigned part discard its assigned site from all available_sites for parts which are constituents/
        # resultants of the assigned part or share a common resultant
        for (p1, s) in pre_assignments.items():
            affected_constituents: set[int] = {p for p, q in self.phi.items() if p1 == q}
            affected_resultants: set[int] = {q for p, q in self.phi.items() if p1 == p}
            same_part_group: set[int] = {p for pair in self.psi for p in pair if p1 in pair}
            for p2 in self.free_parts & (affected_constituents | affected_resultants | same_part_group):
                self.available_sites[p2].discard(s)
                
        # reiterate over parts and check for forced assignments or infeasibility
        for p in self.free_parts:
            if len(self.available_sites[p]) == 0:
                raise ValueError("Pre-assignment made LOP infeasible")
            elif len(self.available_sites[p]) == 1:
                self.available_sites[p] = next(iter(self.available_sites[p]))
                self.free_parts.remove(p)   
                
        # Update reduced_phi to only contain tuples with both parts being unassigned
        self.reduced_phi = {p: q for (p, q) in self.phi.items() if p in self.free_parts and q in self.free_parts}

        # Update reduced_psi to only contain tuples with both parts being unassigned
        self.reduced_psi = {pair for pair in self.psi if
                            pair[0] in self.free_parts and pair[1] in self.free_parts}
        
        # Update the maximal index of unassigned parts
        self.max_part = max(self.free_parts) if self.free_parts else 0

        # Update the number of variables removing any variable of an assigned product
        self.num_var = self.var_index(self.max_part, max(self.available_sites[self.max_part])) + 1 \
            if self.max_part != 0 else 0

        # Update the assigned costs by adding costs for transport between any constituent- and resultant site where both
        # are readily assigned
        self.assigned_costs = 0
        for p1, p2 in {(p, q) for (p, q) in self.phi.items() if p in assigned_parts and q in assigned_parts}:
            self.assigned_costs += self.coefficients[p1 - 2][self.available_sites[p1]][self.available_sites[p2]]

    def assign_parts_inplace(self, pre_assignments: dict[int, int]):
        """
        Assigns parts to sites that were not assigned a pre-assignment.

        pre_assignments: dict[int, int]
            Dictionary of parts with site to be assigned to
        """

        if pre_assignments.keys() & (self.parts.keys() - self.free_parts):
            raise ValueError("Tried to reassign parts that were previously assigned")
        
        full_assignment: dict[int, int] = {p: self.available_sites[p] for p in (self.parts.keys() - self.free_parts)} | pre_assignments
        self.assign_parts(pre_assignments=full_assignment)

    def recall_parts_inplace(self, parts: set[int]):
        """
        Recall parts after they have been preassigned.

        parts: set[int]
            Parts to be unassigned again after pre-assignment
        """
        if parts & self.free_parts:
            raise ValueError("Tried to recall parts that were not assigned in the first place")
        full_assignment: dict[int, int] = {p: self.available_sites[p] for p in (self.parts.keys() - self.free_parts) - parts}
        self.assign_parts(pre_assignments=full_assignment)

    def var_index(self, part: int, site: int) -> int:
        """
        Returns the index of the variable corresponding to the given part and the given site.

        part: int
            Part the variable corresponds to
        site: int
            Site for the given part the variable corresponds to
        """

        # Sum up the available sites for all free parts preceding the given part and the available sites for the given
        # part preceding the given site
        return sum(len(self.available_sites[p]) for p in self.free_parts if p < part) + sum(
            s < site for s in self.available_sites[part])
    
    def ctg(self, shots: int, initial_assignment: Union[int, str, dict[int, int]] = None, bias: float = 0.5) -> tuple[dict[int, int], float]:
        """
        Returns the best assignment and its costs found after applying the CTG several times.
        
        shots: int
            Number of CTG applications
        initial_assignment: str
            Initial candidate for a good solution which may be biased towards
        bias: float
            Bias towards the candidate solution
        """
        if initial_assignment is None:
            initial_assignment = {}
            bias = 0.5
        minimizer: dict[int, int] = {}
        minimum: float = np.inf
        seed(a=1)
        for shot in range(shots):
            control_bits: list[bool] = [False] * len(self.free_parts)
            infeasible: bool = False
            current_assignment: dict[int, int] = {}
            for p, p_index in zip(sorted(self.free_parts), range(len(self.free_parts))):
                sorted_sites: list[int] = sorted(self.available_sites[p])
                clashing_parts: list[int] = [q for q in self.free_parts if q < p
                                             and ((p, q) in self.reduced_phi.items()
                                              or (q, p) in self.reduced_phi.items()
                                              or (p, q) in self.reduced_psi
                                              or (q, p) in self.reduced_psi)]
                if (not any((q, sorted_sites[0]) in current_assignment.items() for q in clashing_parts)
                        and uniform(0, 1) < (bias if (p, sorted_sites[0]) in initial_assignment.items() else 1 - bias)):
                        current_assignment[p] = sorted_sites[0]
                        control_bits[p_index] = True
                        continue
                for s in sorted_sites[1:-1]:
                    if (not (any((q, s) in current_assignment.items() for q in clashing_parts))
                            and uniform(0, 1) < (bias if (p, s) in initial_assignment.items() else 1 - bias)):
                            current_assignment[p] = s
                            control_bits[p_index] = True
                            continue
                if not (any((q, sorted_sites[-1]) in current_assignment.items() for q in clashing_parts)):
                    current_assignment[p] = sorted_sites[-1]
                    control_bits[p_index] = True
                
                if not control_bits[p_index]:
                    infeasible = True
                    break
            if infeasible:
                continue
            if self.get_objective_value(assignment=current_assignment) < minimum:
                minimizer = current_assignment
                minimum = self.get_objective_value(assignment=current_assignment)
        if minimum == np.inf:
            raise RuntimeError('Could not find any feasible solution with CTG')
        return minimizer, minimum
    
    def create_model(self) -> gp.Model:
        model = gp.Model('Logistics')
        model.setParam('OutputFlag', 0)

        # For all available sites of each free product add a binary variable to the gurobipy model and collect all in a
        # dict identified by the tuples (part, site)
        variables = {}
        for p in self.free_parts:
            for s in self.available_sites[p]:
                variables[p, s] = model.addVar(name=f"x[{p},{s}]", vtype=gp.GRB.BINARY)

        # Set up the objective function in the gurobipy model by summing over all terms where neither the resultant nor
        # the constituent is assigned together with the terms where either the resultant or the constituent is assigned
        model.setObjective(
            self.assigned_costs +
            gp.quicksum(self.coefficients[p1 - 2, s1, s2] * variables[(p1, s1)] * variables[(p2, s2)]
                        for (p1, p2) in self.reduced_phi.items()
                        for s1 in self.available_sites[p1]
                        for s2 in self.available_sites[p2]) +
            gp.quicksum(self.coefficients[p1 - 2, s, self.available_sites[p2]] * variables[(p1, s)]
                        for p1 in self.free_parts
                        for p2 in (self.parts.keys() - self.free_parts)
                        if (p1, p2) in self.phi.items()
                        for s in self.available_sites[p1]) +
            gp.quicksum(self.coefficients[p1 - 2, s, self.available_sites[p1]] * variables[(p2, s)]
                        for p1 in (self.parts.keys() - self.free_parts)
                        for p2 in self.free_parts
                        if (p1, p2) in self.phi.items()
                        for s in self.available_sites[p2]), sense=gp.GRB.MINIMIZE)

        # For each part add the one hot constraint to the gurobipy model
        for p in self.free_parts:
            model.addConstr(gp.quicksum(variables[(p, s)] for s in self.available_sites[p]) == 1,
                            name=f"{p} is produced exactly once")

        # Add the phi constraint to the gurobipy model
        model.addConstr(gp.quicksum(variables[(p1, s)] * variables[(p2, s)] for (p1, p2) in self.reduced_phi.items()
                                    for s in (self.available_sites[p1] & self.available_sites[p2])) == 0,
                        name=f"phi constraint")
        
        # Add the psi constraint to the gurobipy model
        model.addConstr(gp.quicksum(variables[(p1, s)] * variables[(p2, s)] for p1, p2 in self.reduced_psi
                                    for s in (self.available_sites[p1] & self.available_sites[p2])) == 0,
                        name=f"psi constraint")

        return model

    def qtg(self) -> tuple[QuantumCircuit, int]:
        """
        Returns the quantum circuit of the initial state based on classical optimization and the number of ancilla
        qubits used in it.
        """
        model = self.create_model()
        model.setParam('FeasibilityTol', 1e-9)
        model.setParam('IntFeasTol', 1e-9)
        model.setParam("IterationLimit", 1)

        # Process pending modifications, write model to a .lp file and optimize it using only one simplex step
        model.update()
        model.optimize()

        # Save the optimal solution found by gurobi to a bitstring
        assignment: list[bool] = [bool(var.X) for var in model.getVars()]

        anc: QuantumRegister = QuantumRegister(len(self.free_parts), 'ancilla')
        main: QuantumRegister = QuantumRegister(self.num_var, 'main')
        qc: QuantumCircuit = QuantumCircuit(anc, main)
        parts = sorted(self.free_parts)
        bias: float = self.num_var / 4.0
        false_angle: float = 2 * acos(sqrt((1.0 + bias) / (2.0 + bias)))
        true_angle: float = 2 * acos(sqrt(1.0 / (2.0 + bias)))

        for p, p_index in zip(parts, range(len(parts))):

            # For each free part collect its resultants as well as preceding parts with a common resultant
            clash_parts: list[int] = [q for q in self.free_parts if q < p
                                         and ((p, q) in self.reduced_phi.items()
                                              or (q, p) in self.reduced_phi.items()
                                              or (p, q) in self.reduced_psi
                                              or (q, p) in self.reduced_psi)]

            # Collect qubit indices corresponding to assign a clash product to the first available site of the current
            # part
            sites = sorted(self.available_sites[p])
            clash_qubits: list[int] = [main[self.var_index(q, sites[0])] for q in clash_parts
                                       if sites[0] in self.available_sites[q]]

            # If clashes with the current part's first available site exist rotate all clashing qubits about the y-axis
            # by an angle depending on the corresponding value of the model's solution controlled by the first
            # #(qubit clashes) ancilla qubits
            if clash_qubits:
                qc.append(RYGate(theta=true_angle if assignment[self.var_index(p, sites[0])] else false_angle).
                          control(num_ctrl_qubits=len(clash_qubits), ctrl_state=0),
                          clash_qubits + [main[self.var_index(p, sites[0])]])

            # Otherwise, perform a y-rotation on the qubit corresponding to the current part's first available site by
            # the angle depending on the corresponding value of the model's solution
            else:
                qc.ry(theta=true_angle if assignment[self.var_index(p, sites[0])] else false_angle,
                      qubit=main[self.var_index(p, sites[0])])

            # Perform an X-gate on the qubit corresponding to the current part's first available site controlled by the
            # ancilla corresponding to the current part
            qc.cx(main[self.var_index(p, sites[0])], anc[p_index])

            # For the remaining available sites of the current part, except for the last, collect all qubit indices
            # corresponding to the same site for clashing parts and perform a y-rotation on the qubit corresponding to
            # the current part and current site by the angle depending on the corresponding value of the model's
            # solution if the first #(qubit clashes) + 1 are in the all zero state.
            for s in sites[1:-1]:
                clash_qubits: list[int] = [main[self.var_index(q, s)] for q in clash_parts
                                           if s in self.available_sites[q]]
                qc.append(RYGate(theta=true_angle if assignment[self.var_index(p, s)] else false_angle).
                          control(num_ctrl_qubits=len(clash_qubits) + 1, ctrl_state=0),
                          [anc[p_index]] + clash_qubits + [main[self.var_index(p, s)]])
                qc.cx(main[self.var_index(p, s)], anc[p_index])

            # For the last available site of the current part, collect all qubit indices corresponding to the same site
            # for clashing parts and perform an X-gate on
            clash_qubits: list[int] = [main[self.var_index(q, sites[-1])] for q in clash_parts
                                       if sites[-1] in self.available_sites[q]]
            qc.append(XGate().control(num_ctrl_qubits=len(clash_qubits) + 1, ctrl_state=0),
                      [anc[0]] + clash_qubits + [main[self.var_index(p, sites[-1])]])
            qc.cx(main[self.var_index(p, sites[-1])], anc[p_index])
            qc.draw('mpl')
        return qc, anc.size
    
    def search_unitaries(self) -> list[QuantumCircuit]:
        
        _identity: QuantumCircuit = QuantumCircuit(self.num_var)
        _identity.id(range(0, self.num_var))
        
        _x_mixer: QuantumCircuit = QuantumCircuit(self.num_var)
        _x_mixer.rx(0.1, range(self.num_var))
        
        _z_mixer: QuantumCircuit = QuantumCircuit(self.num_var)
        _z_mixer.rz(0.15, range(self.num_var))
        
        return [_identity, _x_mixer, _z_mixer]

    def assignment_from_bitstring(self, bitstring: Union[int, str]) -> dict[int, int]:
        """
        Returns a dictionary of parts and assigned site based on a bitstring.

        bitstring: Union[int, str]
            A bitstring representing the assignment of parts to sites in right to left order
        """

        # If bitstring is a decimal, convert it to binary representation, remove the prefix '0b', append trailing zeros
        # such that length matches number of variables and reverse it
        if isinstance(bitstring, int):
            tmp: str = bin(bitstring)[2:].zfill(self.num_var)[::-1]

        # Otherwise, reverse the bitstring
        else:
            tmp: str = bitstring[::-1]

        # For each unassigned part identify the section in the bitstring and assign the site corresponding to a one in
        # the bitstring to the part in a dict
        result: dict[int, int] = {}
        parts: list = sorted(self.free_parts)
        for p in parts:
            sites: list[int] = sorted(self.available_sites[p])
            tmp2 = tmp[:len(sites)]
            for b, position in zip(tmp2, sites):
                if b == '1':
                    result[p] = position
                    break
            tmp = tmp[len(sites):]
        return result

    def is_feasible(self, assignment: Union[int, str, dict[int, int]]) -> bool:
        """
        Returns a boolean indicating whether the given assignment is a feasible solution.

        assignment: Union[int, str, dict[int, int]]
            An assignment of parts to sites
        """
        if isinstance(assignment, int):
            assignment = bin(assignment)[2:]
            
        if isinstance(assignment, str):
            if assignment.count('1') != len(self.free_parts):
                return False # each part must be 
            assignment = self.assignment_from_bitstring(assignment)
        elif not isinstance(assignment, dict):
            raise TypeError('Assignment must be either an integer, a string or a dictionary')
        
        # Check whether each part is assigned to exactly one site
        if set(assignment.keys()) != self.free_parts:
            return False

        # Check whether the sites for pairs of constituent and resultant disagree
        for p1, p2 in self.reduced_phi:
            if assignment[p1] == assignment[p2]:
                return False

        # Check whether the sites for pairs of constituents sharing a common resultant disagree
        for p1, p2 in self.reduced_psi:
            if assignment[p1] == assignment[p2]:
                return False

        return True
    
    def get_assignment(self) -> dict[int, int]:
        return {p: s for (p, s) in self.available_sites.items() if p not in self.free_parts}

    def get_objective_value(self, assignment: Union[int, str, dict[int, int]]) -> float:
        """
        Returns the objective value on a given assignment completed by any pre-assignment.

        assignment: Union[int, str, dict[int, set[int]]]
            Assignment in one of its various forms
        """

        # If assignment is a bitstring or a decimal, convert it to a dictionary. Otherwise, leave it as it is
        if isinstance(assignment, (int, str)):
            assignment = self.assignment_from_bitstring(bitstring=assignment)
        elif not isinstance(assignment, dict):
            raise TypeError('Assignment must be either an integer, a string or a dictionary')

        # Complete the assignment by adding the pre-assignments
        for p in (self.parts.keys() - self.free_parts):
            assignment[p] = self.available_sites[p]

        # Add the transportation cost for each pair in the PBS to the result
        result: float = 0
        for (p1, p2) in self.phi.items():
            result += self.coefficients[p1 - 2][assignment[p1]][assignment[p2]]
        return result

    def get_objective_value_inplace(self) -> float:
        """
        Returns the objective value based on the pre-assignment.
        """
        if self.free_parts:
            raise RuntimeError('All parts have to be assigned in order to calculate the objective value')
        result: float = 0
        for p1, p2 in self.phi.items():
            result += self.coefficients[p1 - 2][self.available_sites[p1]][self.available_sites[p2]]
        return result

    def get_optimal_value(self) -> float:
        """
        Returns the optimal value amongst all possible assignments.
        """

        # Initialize the output to infinity
        result: float = np.inf

        # Compare all assignments where unassigned parts are assigned to one available site
        for assignment in (tuple(zip(self.free_parts, sites)) for sites in
                           product(*(self.available_sites[p] for p in self.free_parts))):

            # Create a deepcopy of the LOP and assign unassigned parts
            instance_copy: LOP = deepcopy(self)
            instance_copy.assign_parts_inplace(assignment)

            # Feasibility check (one-hot constraints are automatically fulfilled by construction)
            feasible: bool = True

            # Check whether the sites for pairs of constituent and resultant disagree
            for (p1, p2) in instance_copy.phi.items():
                feasible &= (instance_copy.available_sites[p1] != instance_copy.available_sites[p2])

            # Check whether the sites for pairs of constituents sharing a common resultant disagree
            for (p1, p2) in instance_copy.psi:
                feasible &= (instance_copy.available_sites[p1] != instance_copy.available_sites[p2])

            # If the assigment is feasible and its objective value is lower than the previous lowest update result
            if feasible:
                result = min(result, instance_copy.get_objective_value_inplace())

            del instance_copy
        return result

The next cell holds the `moment_matrices`-routine which constructs the objective and constraints for the auxiliary SDP. For a given LOP instance, consider the *feasible subspace*
\begin{equation}
    \mathcal{S} = \text{span}(\vert z\rangle\, :\, z \text{ is a feasible solution}),
\end{equation}
the projection $P_{\text{f}}$ on it, and the projection $P_{\text{if}}$ on its orthogonal complement. Furthermore, let
\begin{equation}
    C = \sum_{z} c(z) \vert z\rangle\langle z\vert
\end{equation}
be the instance's objective Hamiltonian, where $c(z)$ denotes the classical objective value of the bitstring $z$. For a given initial state $\vert \iota\rangle$ and a family of search unitaries $U_{i}$, $i = 1, \ldots, \ell$, the `moment_matrices`-routine calculates the following matrix elements:
\begin{equation}
    \boldsymbol{\text{H}}_{i j} = \langle \iota \vert U_{i}^{\dagger} P_{\text{f}} C U_{j} \vert \iota \rangle,\quad
    \boldsymbol{\text{E}}_{i j} = \langle \iota \vert U_{i}^{\dagger} P_{\text{f}} U_{j} \vert \iota \rangle,\quad
    \boldsymbol{\text{F}}_{i j} = \langle \iota \vert U_{i}^{\dagger} P_{\text{if}} U_{j} \vert \iota \rangle.
\end{equation}

In [4]:
class Placeholder(Instruction):
    """
    A child class of Qiskit's Instruction class initializing an empty instruction on a quantum circuit

    ...

    Attributes:
    -----------
    name: str
        Name of the instruction is 'placeholder'

    Methods:
    --------
    inverse(annotated: bool = False)
        Initializes a new instance of an instruction placeholder.
    """
    def __init__(self, num_qubits: int, label: str):
        self.name = 'placeholder'
        super().__init__(self.name, num_qubits, 0, [], label=label)

    def inverse(self, annotated: bool = False):
        """
        Initializes a new instance of an instruction placeholder.

        annotated: bool
        """
        return Placeholder(self.name, self.num_qubits)
    
def moment_matrices(simulator: AerSimulator,
                    instance: LOP,
                    search_unitaries: list[QuantumCircuit],
                    shots: int) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    """
    Returns the moment matrices for a given LOP instance.

    simulator: AerSimulator
        Simulator used to sample bitstring from the quantum computer
    instance: LOP
        Instance for which the moment matrices are computed for
    search_unitaries:
        Unitaries to explore the variational cone
    shots:
        Number of shots to estimate the moment matrices' entries
    """

    # Number of columns and rows of the moment matrices is the number of search unitaries
    dim: int = len(search_unitaries)

    # The state altered by the search unitaries is the initial state defined in the LOP class
    state_prep: tuple[QuantumCircuit, int] = instance.qtg()

    # The number of qubits for the main register is the number of qubits needed for the state preparation
    num_tot_qubits: int = state_prep[0].num_qubits

    # Initialize a dictionary for storing the results of expectation values for candidate states
    diagonal_results: dict[tuple[int, int], float] = {(h, i): 0.0 for h in range(3) for i in range(dim)}

    # Initialize a dictionary for storing the results of Hadamard tests for superposition states
    sp_results: dict[tuple[int, int, int, int], float] = {(g, h, i, j): 0.0 for g in range(2) for h in range(3)
                                                          for i in range(dim) for j in range(i + 1, dim)}

    result: tuple[np.ndarray, np.ndarray, np.ndarray] = (np.zeros(shape=(dim, dim), dtype=np.complex64),
                                                         np.zeros(shape=(dim, dim), dtype=np.complex64),
                                                         np.zeros(shape=(dim, dim), dtype=np.complex64))

    anc: QuantumRegister = QuantumRegister(1, 'anc')

    # Initialize the quantum register for the ancilla qubits needed to prepare the initial state
    inactive_q: QuantumRegister = QuantumRegister(state_prep[1], 'inactive_q')

    # Initialize the quantum register carrying the state altered by the search unitaries
    q: QuantumRegister = QuantumRegister(num_tot_qubits - state_prep[1], 'q')

    # Initialize a classical register carrying the measurement result of the ancilla in the Hadamard test
    anc_c: ClassicalRegister = ClassicalRegister(1, 'anc_c')

    # Initialize a classical register carrying the measurement result of the state after search unitaries were applied
    c: ClassicalRegister = ClassicalRegister(num_tot_qubits - state_prep[1], 'c')

    for i in range(dim):
        # Initialize the quantum circuit to sample bitstrings from the application of each search unitary to the main
        # register
        qc_diag: QuantumCircuit = QuantumCircuit(inactive_q, q, c)

        # Prepare the initial state on the main register with the aid of the preparation ancilla qubits
        qc_diag.compose(other=state_prep[0], qubits=inactive_q[:] + q[:], inplace=True)

        # Apply the current search unitary, measure the qubits shots times and assign the resulting bitstrings to the
        # number of their appearances in a dictionary
        qc_diag.compose(other=search_unitaries[i], qubits=q, inplace=True)
        qc_diag.measure(qubit=q, cbit=c)
        counts_diag: dict[str, float] = simulator.run(transpile(circuits=qc_diag, backend=simulator),
                                                      shots=shots).result().get_counts()

        # For each bitstring in the sample, if it is a feasible assignment add its relative objective value to the first
        # and its frequency of appearance to the second of the current search unitary's diagonal entries.
        for bs, f in counts_diag.items():
            if instance.is_feasible(bs):
                diagonal_results[0, i] += instance.get_objective_value(bs) * f / shots
                diagonal_results[1, i] += f / shots
            # If the bitstring is an infeasible assignment add its frequency of appearance to the third of the current
            # search unitary's diagonal entries
            else:
                diagonal_results[2, i] += f / shots

        for j in range(i + 1, dim):
            qc_sp: QuantumCircuit = QuantumCircuit(anc, inactive_q, q, anc_c, c)

            # Perform a Hadamard gate on the ancilla qubit
            qc_sp.h(qubit=anc)

            # Insert the placeholder for the phase get when computing the imaginary part
            first_s_location: int = len(qc_sp.data)
            qc_sp.append(Placeholder(1, label='id/s'), anc)

            # Prepare the initial state on the main register with the aid of the preparation ancilla qubits
            qc_sp.compose(other=state_prep[0], qubits=inactive_q[:] + q[:], inplace=True)

            # Perform the inner loop's search unitary when the ancilla is in the zero state
            qc_sp.compose(other=search_unitaries[j].control(num_ctrl_qubits=1, ctrl_state=0), qubits=anc[:] + q[:],
                          inplace=True)

            # Perform the outer loop's search unitary when the ancilla is in the one state
            qc_sp.compose(other=search_unitaries[i].control(num_ctrl_qubits=1, ctrl_state=1), qubits=anc[:] + q[:],
                          inplace=True)

            # Perform another Hadamard gate on the ancilla
            qc_sp.h(qubit=anc)

            # Measure the ancilla qubit and store the result to the ancilla bit
            qc_sp.measure(qubit=anc, cbit=anc_c)

            # Measure the main register and store the result to the ancilla qubits
            qc_sp.measure(qubit=q, cbit=c)

            # Initialize a list of dictionary to later assign bitstrings to their count for the real and imaginary parts
            # of the moment matrices' entries
            counts_sp: list[dict[str, float]] = []

            # For the real part, insert an identity to the placeholder and run the circuit, storing the counts
            qc_sp.data[first_s_location] = CircuitInstruction(operation=IGate(), qubits=[qc_sp.qubits[0]], clbits=[])
            counts_sp.append(simulator.run(transpile(circuits=qc_sp, backend=simulator),
                                           shots=shots).result().get_counts())

            # For the real part, insert a phase gate to the placeholder and run the circuit, storing the counts
            qc_sp.data[first_s_location] = CircuitInstruction(operation=SGate(), qubits=[qc_sp.qubits[0]], clbits=[])
            counts_sp.append(simulator.run(transpile(circuits=qc_sp, backend=simulator),
                                           shots=shots).result().get_counts())

            # For the real and imaginary part of all entries,
            for g in range(2):
                for bs, f in counts_sp[g].items():
                    if bs[-1] == '0':
                        if instance.is_feasible(bs[:-1]):
                            sp_results[g, 0, i, j] += instance.get_objective_value(bs[:-1]) * f
                            sp_results[g, 1, i, j] += f
                        else:
                            sp_results[g, 2, i, j] += f

    for h in range(3):
        for i in range(dim):
            result[h][i][i] = diagonal_results[h, i]
            for j in range(i + 1, dim):
                result[h][i][j] = (4 * sp_results[0, h, i, j] / shots - 4j * sp_results[1, h, i, j] / shots
                                   + (1j - 1) * (diagonal_results[h, i] + diagonal_results[h, j])) / 2
                result[h][j][i] = np.conjugate(result[h][i][j])

    return result

The moment matrices are finally passed to an SDP solver whose clearly defined interface with the rest of the algorithm only consists of the objective matrix $\boldsymbol{\text{H}}$ and both constraint matrices $\boldsymbol{\text{E}}$, $\boldsymbol{\text{F}}$. This allows to replace it with further customized SDP solvers. An exciting perspective would be to use QCP, as it is itself an SDP solver, to solve the auxiliary SDPs, produced by running QCP on larger instances.
 In any case, the solver is asked to the find the optimum to the following SDP:
\begin{equation}
\min_{\boldsymbol{\text{X}}} \text{tr}(\boldsymbol{\text{H}}\, \boldsymbol{\text{X}}) \\
\text{s.t.}\ \text{tr}(\boldsymbol{\text{E}}\, \boldsymbol{\text{X}}) = 1 \\
\text{tr}(\boldsymbol{\text{F}}\, \boldsymbol{\text{X}}) = 0 \\
\boldsymbol{\text{X}} \succeq 0.
\end{equation}


In [5]:
class SDPException(Exception):
    """
    Raised when the auxiliary SDP could not be solved
    """
    pass

def solve_auxiliary_gep(obj_mat: np.ndarray, feasible_mat: np.ndarray, infeasible_mat: np.ndarray) -> tuple[np.ndarray, float]:
    delta: float = 0.00001
    eigenvalues_constr_mat, transformation = eigh(feasible_mat, check_finite=False)
    reduced_obj_mat: np.ndarray = np.conjugate(transformation.T) @ obj_mat @ transformation
    reduced_infeasible_mat: np.ndarray = np.conjugate(transformation.T) @ infeasible_mat @ transformation
    kernel_dim: int = sum(e < 10 ** (-4) for e in eigenvalues_constr_mat)
    if kernel_dim != 0:
        eigenvalues_constr_mat = eigenvalues_constr_mat[kernel_dim:]
        mask = np.append(np.zeros(kernel_dim, bool), np.ones(reduced_obj_mat.shape[0] - kernel_dim, bool))
        reduced_obj_mat = reduced_obj_mat[mask, :][:, mask]
        reduced_infeasible_mat = reduced_infeasible_mat[mask, :][:, mask]
        
    reduced_feasible_mat: np.ndarray = np.diag(eigenvalues_constr_mat.astype(np.complex64))
    prob = picos.Problem()
    x = picos.HermitianVariable("X", len(reduced_obj_mat))
    h = picos.Constant("H", reduced_obj_mat.tolist())
    e = picos.Constant("E", reduced_feasible_mat.tolist())
    f = picos.Constant("F", reduced_infeasible_mat.tolist())
    prob.set_objective("min", picos.trace(h * x).real)
    prob.add_constraint(x >> 0)
    prob.add_constraint(picos.trace(e * x).real >= 1 - delta)
    prob.add_constraint(picos.trace(e * x).real <= 1 + delta)
    prob.add_constraint(picos.trace(f * x).real >= 0 - delta)
    prob.add_constraint(picos.trace(f * x).real <= 0 + delta)
    # prob.add_constraint(picos.trace(e * x).real == 1)
    # prob.add_constraint(picos.trace(f * x).real == 0)
    try:
        prob.solve()
        solution: np.ndarray = np.array(x.value)
        enlarged_solution = np.zeros(shape=obj_mat.shape, dtype=np.complex64)
        enlarged_solution[obj_mat.shape[0] - solution.shape[0]:, obj_mat.shape[1] - solution.shape[1]:] = solution
        result = transformation.T @ enlarged_solution @ np.conjugate(transformation.T), prob.value
    except (ValueError, IndexError, picos.modeling.problem.SolutionFailure):
        print(f"SDP {prob.status}; {prob.value}")
        raise SDPException
    return result

In [6]:
instance1 = LOP('cost_10_07.csv', 'pbs_10.csv')
instance1.assign_parts({1: 1, 2: 3, 3: 2, 4: 5, 5: 5, 6: 1, 7: 2, 8: 1, 9: 1})

instance2 = LOP('cost_10_07.csv', 'pbs_10.csv')
instance2.assign_parts({1: 1, 2: 3, 3: 2, 4: 5, 5: 5, 6: 1, 7: 2, 8: 1})

instance3 = LOP('cost_10_07.csv', 'pbs_10.csv')
instance3.assign_parts({1: 1, 2: 3, 3: 2, 4: 5, 5: 5, 6: 1, 7: 2})

instance4 = LOP('cost_10_07.csv', 'pbs_10.csv')
instance4.assign_parts({1: 1, 2: 3, 3: 2, 4: 5, 5: 5, 6: 1})

instance5 = LOP('cost_10_07.csv', 'pbs_10.csv')
instance5.assign_parts({1: 1, 2: 3, 3: 2, 4: 5, 5: 5})

instance6 = LOP('cost_10_07.csv', 'pbs_10.csv')
instance6.assign_parts({1: 1, 2: 3, 3: 2, 4: 5})

instance7 = LOP('cost_10_07.csv', 'pbs_10.csv')
instance7.assign_parts({1: 1, 2: 3, 3: 2})

instance8 = LOP('cost_10_07.csv', 'pbs_10.csv')
instance8.assign_parts({1: 1, 2: 3})

instance9 = LOP('cost_10_07.csv', 'pbs_10.csv')
instance9.assign_parts({1: 1})

instance_final = LOP('cost_10_07.csv', 'pbs_10.csv')

def report(instance: LOP):
    print(f'Number of products: {instance.num_parts}')
    print(f'Number of sites: {instance.num_sites}')
    print(f'Phi constraints: {instance.phi}')
    print(f'Reduced phi constraints: {instance.reduced_phi}')
    print(f'Psi constraints: {instance.psi}')
    print(f'Reduced psi constraints: {instance.reduced_psi}')
    print(f'Products: {instance.parts}')
    print(f'Number of layers: {instance.layers}')
    print(f'Free parts: {instance.free_parts}')
    print(f'Available sites: {instance.available_sites}')
    print(f'Maximum product: {instance.max_part}')
    print(f'Number of variables: {instance.num_var}')
    print(f'Assigned cost: {instance.assigned_costs}')

In [7]:
def controlled_unitary(search_unitaries: list[QuantumCircuit]) -> QuantumCircuit:
    dim: int = len(search_unitaries)
    num_main_qubits: int = search_unitaries[0].num_qubits
    num_anc_qubits: int = (dim - 1).bit_length()
    result: QuantumCircuit = QuantumCircuit(QuantumRegister(num_anc_qubits + num_main_qubits))
    for i in range(dim):
        result.compose(search_unitaries[i].control(num_ctrl_qubits=num_anc_qubits, ctrl_state=i), inplace=True)
    return result

def lcu(init_state: tuple[QuantumCircuit, int], aux_state: np.ndarray,
        search_unitaries: list[QuantumCircuit]) -> QuantumCircuit:
    num_ctrl_qubits: int = (len(search_unitaries) - 1).bit_length()
    aux_state_reg: QuantumRegister = QuantumRegister(num_ctrl_qubits, 'aux_state')
    state_anc_reg: QuantumRegister = QuantumRegister(init_state[1], 'state_anc')
    state_main_reg: QuantumRegister = QuantumRegister(init_state[0].num_qubits - init_state[1], 'state_main')
    qc: QuantumCircuit = QuantumCircuit(aux_state_reg, state_anc_reg, state_main_reg)
    qc.compose(init_state[0], list(state_anc_reg) + list(state_main_reg), inplace=True)
    qc.compose(
        StatePreparation(params=list(aux_state) + [0.0 for _ in range((1 << num_ctrl_qubits) - len(search_unitaries))],
                         normalize=True), qubits=aux_state_reg, inplace=True)
    for i in range(len(search_unitaries)):
        qc.compose(search_unitaries[i].control(num_ctrl_qubits=num_ctrl_qubits, ctrl_state=i),
                   qubits=list(aux_state_reg) + list(state_main_reg), inplace=True)
    superposition: list[float] = ([1 / sqrt(len(search_unitaries)) for _ in range(len(search_unitaries))]
                                  + [0.0 for _ in
                                     range((1 << (len(search_unitaries) - 1).bit_length()) - len(search_unitaries))])
    qc.compose(StatePreparation(params=list(superposition), normalize=True).inverse(),
               qubits=aux_state_reg, inplace=True)

    return qc

In [8]:
def qcp(simulator: AerSimulator, instance: LOP, search_unitaries: list[QuantumCircuit], matrix_shots: int = 1024, sample_shots: int = 256, final_shots: int = 256) -> tuple[dict[int, int], float]:
    obj_mat, feasible_mat, infeasible_mat = moment_matrices(simulator=simulator, instance=instance, search_unitaries=search_unitaries, shots=matrix_shots)
    aux_state, _ = solve_auxiliary_gep(obj_mat=obj_mat, feasible_mat=feasible_mat, infeasible_mat=infeasible_mat)
    aux_size: int = len(search_unitaries)
    num_aux_qubits: int = (aux_size - 1).bit_length()
    probabilities, pure_states = eigh(aux_state)
    neglected_dim: int = sum(p < 10 ** (-4) for p in probabilities)
    probabilities = probabilities[neglected_dim:]
    pure_states = pure_states.T[neglected_dim:]
    samples = Counter(choices(population=range(aux_size - neglected_dim), weights=probabilities, k=sample_shots))
    init_state: tuple[QuantumCircuit, int] = instance.qtg()
    qcp_counts_trimmed: dict[str, int] = {}
    qcp_feasible_shots: int = 0
    for (state_index, shots) in samples.items():
        state: np.ndarray = pure_states[state_index]
        qcp_circuit: QuantumCircuit = lcu(init_state=init_state,
                                          aux_state=state,
                                          search_unitaries=search_unitaries)
        qcp_circuit.measure_all(inplace=True, add_bits=True)
        qcp_counts = simulator.run(transpile(circuits=qcp_circuit, backend=simulator), shots=shots * final_shots).result().get_counts()
        for bs, c in qcp_counts.items():
            if bs[-num_aux_qubits:] == '0' * num_aux_qubits:
                bs_trimmed = bs[:qcp_circuit.num_qubits - num_aux_qubits - init_state[1]]
                qcp_counts_trimmed[bs_trimmed] = qcp_counts_trimmed.get(bs_trimmed, 0) + c
                qcp_feasible_shots += c
    minimizer: dict[int, int] = {}
    minimum: float = np.inf
    for bs in qcp_counts_trimmed.keys():
        if instance.is_feasible(bs):
            if instance.get_objective_value(bs) < minimum:
                minimizer = instance.assignment_from_bitstring(bs) | instance.get_assignment()
                minimum = instance.get_objective_value(bs)
    return minimizer, minimum

In [20]:
CTG_SHOTS: int = 512
MAX_QUBITS: int = 12
SIMULATOR: AerSimulator = AerSimulator()
MATRIX_SHOTS: int = 1024
SAMPLE_SHOTS: int = 256
FINAL_SHOTS: int = 256

def bound(instance: LOP) -> tuple[float, tuple[dict[int, int], float]]:
    # derive lower bound via relaxation
    model: gp.Model = instance.create_model()
    model.setParam('IterationLimit', 200)
    model.reset()
    model.optimize()
    lower_bound: float = model.ObjBound
    
    #derive upper bound via finding a good feasible state
    if instance.num_var <= MAX_QUBITS:
        upper_sol, upper_bound = qcp(SIMULATOR, instance, instance.search_unitaries(), MATRIX_SHOTS, SAMPLE_SHOTS, FINAL_SHOTS)
    else:
        model.reset()
        model.setParam('IterationLimit', 50)
        model.optimize()
        assigned: dict[int, int] = instance.assignment_from_bitstring(''.join(format(x, 'b')for x in reversed([int(var.X) for var in model.getVars()])))
        upper_sol: dict[int, int] = instance.get_assignment() | assigned
        upper_bound: float = model.ObjVal
    return lower_bound, (upper_sol, upper_bound)
        
    

def branch_and_bound(instance: LOP) -> tuple[dict[int, int], float]:
    solution, global_upper_bound = instance.ctg(shots=CTG_SHOTS)
    print(f'Initial solution (provided by CTG): {solution}')
    print(f'Initial upper bound (provided by CTG): {global_upper_bound}')
    instance_queue: Queue = Queue()
    instance_queue.put(instance)
    current_layer: int = 1
    current_num_parts: int = len(instance.free_parts) + 1
    while not instance_queue.empty():
        current_instance: LOP = instance_queue.get()
        if len(current_instance.free_parts) < current_num_parts:
            print(f'\nEntering layer {current_layer}\n' + ('-' * 20))
            current_layer += 1
            current_num_parts = len(current_instance.free_parts)
        if current_instance.num_var == 0:
            objective_value: float = current_instance.get_objective_value_inplace()
            if objective_value < global_upper_bound:
                solution = current_instance.get_assignment()
                global_upper_bound = objective_value
                print(f'Updated solution: {solution} ({global_upper_bound}) via direct calculation')
        else:
            current_bounds: tuple[float, tuple[dict[int, int], float]] = bound(current_instance)
            if current_bounds[0] >= global_upper_bound:
                continue
        
            if current_bounds[1][1] < global_upper_bound:
                solution, global_upper_bound = current_bounds[1]
                print(f'Updated solution: {solution} ({global_upper_bound}) via upper bound heuristic')
            
            current_part: int = max(current_instance.free_parts)
            for site in current_instance.available_sites[current_part]:
                subinstance: LOP = deepcopy(current_instance)
                subinstance.assign_parts_inplace({current_part: site})
                instance_queue.put(subinstance)
                
    return solution, global_upper_bound