## Quadratic Assignment Problem

In [1]:
import numpy as np
from docplex.mp.model import Model

def generate_qap_instance(n, seed=None):
    """
    Generates an instance for the Quadratic Assignment Problem (QAP).

    Parameters:
    - n: int, the number of facilities/locations.
    - seed: int, random seed for reproducibility (optional).

    Returns:
    - a: 2D numpy array, the flow matrix.
    - b: 2D numpy array, the distance matrix.
    """
    if seed is not None:
        np.random.seed(seed)

    # Generate random flow and distance matrices
    a = np.random.randint(1, 100, size=(n, n))  # Flow matrix
    b = np.random.randint(1, 100, size=(n, n))  # Distance matrix

    return a, b

def create_qap_model(n, seed=None):
    """
    Creates a CPLEX model for the Quadratic Assignment Problem (QAP).

    Parameters:
    - n: int, the number of facilities/locations.
    - seed: int, random seed for reproducibility (optional).

    Returns:
    - model: CPLEX model.
    - x: 2D list of CPLEX binary variables representing assignments.
    - a: 2D numpy array, the flow matrix.
    - b: 2D numpy array, the distance matrix.
    """
    # Generate QAP instance
    a, b = generate_qap_instance(n, seed)

    # Create CPLEX model
    model = Model(name="Quadratic Assignment Problem")

    # Decision variables: x[i][j] = 1 if facility i is assigned to location j, 0 otherwise
    x = [[model.binary_var(name=f"x_{i}_{j}") for j in range(n)] for i in range(n)]

    # Objective: Minimize the total cost
    model.minimize(
        model.sum(a[i, k] * b[j, l] * x[i][j] * x[k][l] for i in range(n) for j in range(n) for k in range(n) for l in range(n))
    )

    # Constraints: Each facility is assigned to exactly one location
    for i in range(n):
        model.add_constraint(model.sum(x[i][j] for j in range(n)) == 1, f"assignment_constraint_facility_{i}")

    # Constraints: Each location is assigned to exactly one facility
    for j in range(n):
        model.add_constraint(model.sum(x[i][j] for i in range(n)) == 1, f"assignment_constraint_location_{j}")

    return model, x, a, b

# Example usage
if __name__ == "__main__":
    n = 5  # Number of facilities/locations
    seed = 42  # Random seed for reproducibility

    model, x, a, b = create_qap_model(n, seed)

    # Print model summary
    print(model.export_as_lp_string())

    # Solve the model
    solution = model.solve()

    if solution:
        print("Objective value:", solution.objective_value)
        assignment = [[x[i][j].solution_value for j in range(n)] for i in range(n)]
        print("Assignment matrix:")
        for row in assignment:
            print(row)
    else:
        print("No solution found.")


\ This file has been generated by DOcplex
\ ENCODING=ISO-8859-1
\Problem name: Quadratic Assignment Problem

Minimize
 obj: [ 6032 x_0_0^2 + 8424 x_0_0*x_0_1 + 10816 x_0_0*x_0_2 + 10400 x_0_0*x_0_3
      + 10192 x_0_0*x_0_4 + 13224 x_0_0*x_1_0 + 6570 x_0_0*x_1_1
      + 17184 x_0_0*x_1_2 + 11256 x_0_0*x_1_3 + 17220 x_0_0*x_1_4
      + 11948 x_0_0*x_2_0 + 11044 x_0_0*x_2_1 + 5310 x_0_0*x_2_2
      + 10446 x_0_0*x_2_3 + 3962 x_0_0*x_2_4 + 8584 x_0_0*x_3_0
      + 3404 x_0_0*x_3_1 + 12876 x_0_0*x_3_2 + 7260 x_0_0*x_3_3
      + 13132 x_0_0*x_3_4 + 14500 x_0_0*x_4_0 + 10236 x_0_0*x_4_1
      + 12778 x_0_0*x_4_2 + 12506 x_0_0*x_4_3 + 11998 x_0_0*x_4_4 + 4368 x_0_1^2
      + 16016 x_0_1*x_0_2 + 11960 x_0_1*x_0_3 + 10504 x_0_1*x_0_4
      + 11898 x_0_1*x_1_0 + 9576 x_0_1*x_1_1 + 19716 x_0_1*x_1_2
      + 13470 x_0_1*x_1_3 + 15762 x_0_1*x_1_4 + 5642 x_0_1*x_2_0
      + 8652 x_0_1*x_2_1 + 13672 x_0_1*x_2_2 + 11480 x_0_1*x_2_3
      + 6096 x_0_1*x_2_4 + 8584 x_0_1*x_3_0 + 6216 x_0_1*x_3_1
      +

In [2]:
# print the model
print(model.export_as_lp_string())

\ This file has been generated by DOcplex
\ ENCODING=ISO-8859-1
\Problem name: Quadratic Assignment Problem

Minimize
 obj: [ 6032 x_0_0^2 + 8424 x_0_0*x_0_1 + 10816 x_0_0*x_0_2 + 10400 x_0_0*x_0_3
      + 10192 x_0_0*x_0_4 + 13224 x_0_0*x_1_0 + 6570 x_0_0*x_1_1
      + 17184 x_0_0*x_1_2 + 11256 x_0_0*x_1_3 + 17220 x_0_0*x_1_4
      + 11948 x_0_0*x_2_0 + 11044 x_0_0*x_2_1 + 5310 x_0_0*x_2_2
      + 10446 x_0_0*x_2_3 + 3962 x_0_0*x_2_4 + 8584 x_0_0*x_3_0
      + 3404 x_0_0*x_3_1 + 12876 x_0_0*x_3_2 + 7260 x_0_0*x_3_3
      + 13132 x_0_0*x_3_4 + 14500 x_0_0*x_4_0 + 10236 x_0_0*x_4_1
      + 12778 x_0_0*x_4_2 + 12506 x_0_0*x_4_3 + 11998 x_0_0*x_4_4 + 4368 x_0_1^2
      + 16016 x_0_1*x_0_2 + 11960 x_0_1*x_0_3 + 10504 x_0_1*x_0_4
      + 11898 x_0_1*x_1_0 + 9576 x_0_1*x_1_1 + 19716 x_0_1*x_1_2
      + 13470 x_0_1*x_1_3 + 15762 x_0_1*x_1_4 + 5642 x_0_1*x_2_0
      + 8652 x_0_1*x_2_1 + 13672 x_0_1*x_2_2 + 11480 x_0_1*x_2_3
      + 6096 x_0_1*x_2_4 + 8584 x_0_1*x_3_0 + 6216 x_0_1*x_3_1
      +

In [3]:
import os
import json
import numpy as np
from docplex.mp.model import Model
import io
import time
import os
import json
import sys
import numpy as np
from docplex.mp.model import Model
from qiskit_optimization.translators import from_docplex_mp
from qiskit_optimization.converters import QuadraticProgramToQubo
from qiskit.primitives import StatevectorEstimator, StatevectorSampler

estimator = StatevectorEstimator()

In [4]:
qp = from_docplex_mp(model)
print(qp.export_as_lp_string())

\ This file has been generated by DOcplex
\ ENCODING=ISO-8859-1
\Problem name: Quadratic Assignment Problem

Minimize
 obj: [ 6032 x_0_0^2 + 8424 x_0_0*x_0_1 + 10816 x_0_0*x_0_2 + 10400 x_0_0*x_0_3
      + 10192 x_0_0*x_0_4 + 13224 x_0_0*x_1_0 + 6570 x_0_0*x_1_1
      + 17184 x_0_0*x_1_2 + 11256 x_0_0*x_1_3 + 17220 x_0_0*x_1_4
      + 11948 x_0_0*x_2_0 + 11044 x_0_0*x_2_1 + 5310 x_0_0*x_2_2
      + 10446 x_0_0*x_2_3 + 3962 x_0_0*x_2_4 + 8584 x_0_0*x_3_0
      + 3404 x_0_0*x_3_1 + 12876 x_0_0*x_3_2 + 7260 x_0_0*x_3_3
      + 13132 x_0_0*x_3_4 + 14500 x_0_0*x_4_0 + 10236 x_0_0*x_4_1
      + 12778 x_0_0*x_4_2 + 12506 x_0_0*x_4_3 + 11998 x_0_0*x_4_4 + 4368 x_0_1^2
      + 16016 x_0_1*x_0_2 + 11960 x_0_1*x_0_3 + 10504 x_0_1*x_0_4
      + 11898 x_0_1*x_1_0 + 9576 x_0_1*x_1_1 + 19716 x_0_1*x_1_2
      + 13470 x_0_1*x_1_3 + 15762 x_0_1*x_1_4 + 5642 x_0_1*x_2_0
      + 8652 x_0_1*x_2_1 + 13672 x_0_1*x_2_2 + 11480 x_0_1*x_2_3
      + 6096 x_0_1*x_2_4 + 8584 x_0_1*x_3_0 + 6216 x_0_1*x_3_1
      +

In [5]:
converter = QuadraticProgramToQubo()
qubo = converter.convert(qp)

print(qubo.export_as_lp_string())


\ This file has been generated by DOcplex
\ ENCODING=ISO-8859-1
\Problem name: Quadratic Assignment Problem

Minimize
 obj: - 6298948 x_0_0 - 6298948 x_0_1 - 6298948 x_0_2 - 6298948 x_0_3
      - 6298948 x_0_4 - 6298948 x_1_0 - 6298948 x_1_1 - 6298948 x_1_2
      - 6298948 x_1_3 - 6298948 x_1_4 - 6298948 x_2_0 - 6298948 x_2_1
      - 6298948 x_2_2 - 6298948 x_2_3 - 6298948 x_2_4 - 6298948 x_3_0
      - 6298948 x_3_1 - 6298948 x_3_2 - 6298948 x_3_3 - 6298948 x_3_4
      - 6298948 x_4_0 - 6298948 x_4_1 - 6298948 x_4_2 - 6298948 x_4_3
      - 6298948 x_4_4 + [ 6304980 x_0_0^2 + 6307372 x_0_0*x_0_1
      + 6309764 x_0_0*x_0_2 + 6309348 x_0_0*x_0_3 + 6309140 x_0_0*x_0_4
      + 6312172 x_0_0*x_1_0 + 6570 x_0_0*x_1_1 + 17184 x_0_0*x_1_2
      + 11256 x_0_0*x_1_3 + 17220 x_0_0*x_1_4 + 6310896 x_0_0*x_2_0
      + 11044 x_0_0*x_2_1 + 5310 x_0_0*x_2_2 + 10446 x_0_0*x_2_3
      + 3962 x_0_0*x_2_4 + 6307532 x_0_0*x_3_0 + 3404 x_0_0*x_3_1
      + 12876 x_0_0*x_3_2 + 7260 x_0_0*x_3_3 + 13132 x_0_0*x

In [6]:
# number of variables
num_vars = qubo.get_num_vars()
print('Number of variables:', num_vars)


Number of variables: 25


In [7]:
sys.path.append(os.path.abspath(os.path.join('..')))
from pce_qubo.pauli_correlation_encoding import PauliCorrelationEncoding
from pce_qubo.mixed_optimize import PauliCorrelationOptimizer

from pce_qubo.utility import Utility
from qiskit.quantum_info import SparsePauliOp, Statevector

In [8]:
pauli_encoder = PauliCorrelationEncoding()
k = 2
num_qubits = pauli_encoder.find_n(num_vars, k)
pauli_strings = SparsePauliOp(pauli_encoder.generate_pauli_strings(num_qubits, num_vars, k))
print(f"Number of qubits: {num_qubits} for k={k} with {num_vars} binary variables")

Number of qubits: 5 for k=2 with 25 binary variables


In [9]:
from qiskit_algorithms.optimizers import ADAM, POWELL, SLSQP, COBYLA, P_BFGS

In [10]:
# Initial setup
depth = 2 * num_qubits
ansatz = pauli_encoder.BrickWork(depth=depth, num_qubits=num_qubits)
optimizer = SLSQP(10) #P_BFGS() #
# optimizer = POWELL()

In [11]:
pce = PauliCorrelationOptimizer(pauli_encoder=pauli_encoder, depth = depth,qubo=qubo, k=k,
                                num_qubits=num_qubits)

In [None]:
import numpy as np
import logging
import matplotlib.pyplot as plt
import pickle

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Functions
def adaptive_perturbation(params, perturbation_factor, failure_count, historical_trend, max_factor=1e-2):
    """Adds directional and random perturbation based on historical trends."""
    random_perturbation = np.random.normal(0, perturbation_factor * (1 + failure_count / 5), size=params.shape)
    directional_perturbation = perturbation_factor * np.sign(historical_trend)
    return params + random_perturbation + directional_perturbation

def weighted_blended_initialization(previous_params, performance_weights, blend_factor=0.7):
    """Blends previous parameters with random ones, weighted by performance."""
    random_params = np.random.rand(len(previous_params))
    weighted_params = performance_weights * previous_params
    return blend_factor * weighted_params + (1 - blend_factor) * random_params

def initialize_within_range(num_params, lower_bound=-np.pi, upper_bound=np.pi):
    """Initializes parameters uniformly within a specified range."""
    return np.random.uniform(lower_bound, upper_bound, size=num_params)

def history_based_cooling_schedule(temperature, alpha, improvement_history, window=5):
    """Dynamically adjusts cooling based on recent improvement trends."""
    if len(improvement_history) >= window:
        recent_trend = sum(improvement_history[-window:])
        if recent_trend > 0:
            alpha = min(alpha * 1.05, 0.99)
        else:
            alpha = max(alpha * 0.95, 0.8)
    return max(temperature * alpha, 1e-8)

def save_checkpoint(filename, params, cost, round_num):
    """Saves the current state of optimization."""
    checkpoint = {'params': params, 'cost': cost, 'round_num': round_num}
    with open(filename, 'wb') as f:
        pickle.dump(checkpoint, f)

def load_checkpoint(filename):
    """Loads a saved optimization state."""
    with open(filename, 'rb') as f:
        return pickle.load(f)

def evaluate_perturbed_params(params, perturbation_factor, optimizer, pce, no_improvement_count, historical_trend):
    """Evaluate a perturbed parameter set and return both parameters and QUBO cost."""
    perturbed_params = adaptive_perturbation(params, perturbation_factor, no_improvement_count, historical_trend)
    optimized_params = pce.optimize(optimizer, perturbed_params)  # Perform optimization

    # Evaluate the QUBO cost for the optimized parameters
    final_ansatz = pauli_encoder.BrickWork(depth=depth, num_qubits=num_qubits).assign_parameters(optimized_params)
    psi_final = Statevector(final_ansatz)
    utility = Utility()
    qubo_bitstring = utility.evaluate_sign_function(psi_final, pauli_strings)
    qubo_cost = qubo.objective.evaluate(qubo_bitstring)

    return optimized_params, qubo_cost

# Initialization
logger.info("Step 1: Initializing parameters and variables.")
params = initialize_within_range(ansatz.num_parameters)
best_params = params.copy()  # Store the best parameters
best_qubo_cost = float('inf')  # Initialize best QUBO cost to infinity
best_qubo_bitstring = None  # Initialize best QUBO bitstring
no_improvement_count = 0  # Counter for consecutive rounds without improvement
max_no_improvement_rounds = 10  # Stop after 10 consecutive rounds without improvement
perturbation_factor = 1e-4  # Initial perturbation factor
decay_factor = 0.99  # Decay factor for perturbation
early_stopping_threshold = 1e-6  # Threshold for minimal improvement
fixed_threshold = 50  # Fixed improvement threshold
percentage_threshold = 0.001  # 0.1% of the best QUBO cost
improvement_history = []  # Track improvements for plotting
cumulative_improvement = 0  # Track cumulative improvement
historical_trend = np.zeros_like(params)  # Track parameter improvement trend
performance_weights = np.ones_like(params)  # Initialize performance weights

# Logging optimizer information
logger.info(f"Using {optimizer.__class__.__name__} optimizer.")
logger.info(f"Early stopping configured with fixed_threshold={fixed_threshold} and percentage_threshold={percentage_threshold * 100:.2f}%.")

round_num = 0
last_improvement_round = 0  # Track the round where the last significant improvement occurred

while no_improvement_count < max_no_improvement_rounds:
    round_num += 1
    logger.info(f"\n--- Round {round_num} ---")

    # Evaluate perturbations sequentially
    best_parallel_cost = float('inf')
    best_result = None

    for _ in range(4):  # Evaluate 4 perturbations sequentially
        result, qubo_cost = evaluate_perturbed_params(
            params, perturbation_factor, optimizer, pce, no_improvement_count, historical_trend
        )
        if qubo_cost < best_parallel_cost:
            best_parallel_cost = qubo_cost
            best_result = result

    result = best_result  # Use the best result from evaluations

    # Prepare the ansatz with the optimized parameters
    logger.info("Assigning optimized parameters to the ansatz...")
    final_ansatz = pauli_encoder.BrickWork(depth=depth, num_qubits=num_qubits).assign_parameters(result)
    psi_final = Statevector(final_ansatz)

    # Evaluate the QUBO cost and bitstring
    logger.info("Evaluating the QUBO cost and bitstring...")
    utility = Utility()
    qubo_bitstring = utility.evaluate_sign_function(psi_final, pauli_strings)
    qubo_cost = qubo.objective.evaluate(qubo_bitstring)
    logger.info(f"QUBO cost for this round: {qubo_cost}")
    market_share_bitstring = converter.interpret(qubo_bitstring)
    initial_feasible = qp.get_feasibility_info(market_share_bitstring)
    market_share_cost = qp.objective.evaluate(market_share_bitstring)
    logger.info(f"Market share cost: {market_share_cost}")
    logger.info(f"Initial feasibility: {initial_feasible}")

    # Check if the QUBO cost improved
    improvement = best_qubo_cost - qubo_cost
    improvement_history.append(improvement)
    cumulative_improvement += improvement
    historical_trend = np.sign(result - params) * improvement

    # Calculate dynamic threshold
    dynamic_threshold = max(fixed_threshold, percentage_threshold * best_qubo_cost)

    # Check if |qubo_cost| equals market_share_cost
    if abs(qubo_cost) == market_share_cost:
        logger.info(f"Stopping condition met: |QUBO cost| equals Market Share Cost.")
        logger.info(f"Final QUBO cost: {qubo_cost}, Market Share Cost: {market_share_cost}.")
        break

    if qubo_cost < best_qubo_cost:
        logger.info(f"Improvement detected! Previous best QUBO cost: {best_qubo_cost}, Current QUBO cost: {qubo_cost}")
        best_qubo_cost = qubo_cost
        best_params = result.copy()  # Save the best parameters
        best_qubo_bitstring = qubo_bitstring.copy()  # Save the best bitstring
        no_improvement_count = 0  # Reset no improvement counter
        perturbation_factor *= decay_factor  # Decay perturbation factor for precision
        last_improvement_round = round_num  # Update last improvement round

        # Perturb the trained parameters slightly
        logger.info("Perturbing the best parameters slightly for the next round.")
        params = adaptive_perturbation(best_params, perturbation_factor, no_improvement_count, historical_trend)
    else:
        logger.info(f"No improvement this round. Previous best QUBO cost remains: {best_qubo_cost}")
        no_improvement_count += 1

        # Apply perturbation or blended initialization
        if no_improvement_count % 2 == 0:
            logger.info("Applying blended initialization to explore new solutions.")
            params = weighted_blended_initialization(best_params, performance_weights)
        else:
            logger.info("Applying stronger perturbation to the best parameters.")
            params = adaptive_perturbation(best_params, perturbation_factor * 2, no_improvement_count, historical_trend)

    # Stop if no improvement for max_no_improvement_rounds consecutive rounds
    rounds_remaining = max_no_improvement_rounds - no_improvement_count
    logger.info(f"Consecutive no-improvement rounds: {no_improvement_count}. Rounds remaining before stopping: {rounds_remaining}.")

    if no_improvement_count >= max_no_improvement_rounds:
        logger.info(f"No improvement detected for {max_no_improvement_rounds} consecutive rounds.")
        logger.info(f"Early stopping triggered at round {round_num}.")
        break

    # Save checkpoints
    save_checkpoint("optimization_checkpoint.pkl", best_params, best_qubo_cost, round_num)

# Final Results
logger.info("\nOptimization complete.")
logger.info(f"Best QUBO cost: {best_qubo_cost}")
logger.info(f"Best QUBO bitstring: {best_qubo_bitstring}")
market_share_bitstring = converter.interpret(best_qubo_bitstring)
initial_feasible = qp.get_feasibility_info(market_share_bitstring)
market_share_cost = qp.objective.evaluate(market_share_bitstring)
logger.info(f"Market share cost: {market_share_cost}")


logger.info(f"Final feasibility: {initial_feasible}")

# Plot improvement history
plt.plot(improvement_history)
plt.title("Improvement Over Rounds")
plt.xlabel("Rounds")
plt.ylabel("Improvement")
# save the plot
plt.savefig("improvement_history.png")
plt.show()


INFO:__main__:Step 1: Initializing parameters and variables.
INFO:__main__:Using SLSQP optimizer.
INFO:__main__:Early stopping configured with fixed_threshold=50 and percentage_threshold=0.10%.
INFO:__main__:
--- Round 1 ---
INFO:__main__:Assigning optimized parameters to the ansatz...
INFO:__main__:Evaluating the QUBO cost and bitstring...
INFO:__main__:QUBO cost for this round: 19058325.0
INFO:__main__:Market share cost: 161481.0
INFO:__main__:Initial feasibility: (False, [], [<LinearConstraint: x_3_0 + x_3_1 + x_3_2 + x_3_3 + x_3_4 == 1 'assignment_constraint_facility_3'>, <LinearConstraint: x_4_0 + x_4_1 + x_4_2 + x_4_3 + x_4_4 == 1 'assignment_constraint_facility_4'>, <LinearConstraint: x_0_0 + x_1_0 + x_2_0 + x_3_0 + x_4_0 == 1 'assignment_constraint_location_0'>, <LinearConstraint: x_0_1 + x_1_1 + x_2_1 + x_3_1 + x_4_1 == 1 'assignment_constraint_location_1'>, <LinearConstraint: x_0_2 + x_1_2 + x_2_2 + x_3_2 + x_4_2 == 1 'assignment_constraint_location_2'>, <LinearConstraint: x_