# Grover Adaptive Search Algorithm Tests 2

This Python notebook provides a big test of Grover Adaptive search with a bigger example, separated from the `GAS_Tester_1.ipynb` notebook. All tests have been executed on the **OpenQASM Simulator** (https://sooluthomas.github.io/testTranslation/terra/executing_quantum_programs.html#executing-quantum-programs) offered by **Qiskit** (https://qiskit.org/), but any other backend (simulator or real quantum machine) can be used to run these tests.

In [None]:
from qiskit import *
from qiskit.visualization import plot_histogram

from random import getrandbits, randint, random, choices

from matplotlib import pyplot as plt
%matplotlib inline
from matplotlib.patches import Ellipse

import numpy as np

from binary_cost_function import *
from custom_gas import *
from num_base_converter import *

from functools import reduce

import pickle

### Manual tests

We will perform a bigger test of GAS to showcase a longer execution of the algorithm. We first create a randomized binary cost function. The following parameters control the nature of the cost function:
  - `num_bits`: Number of bits of the clauses
  - `spec`: Percentage of clause bits on $\mathtt{0}$ or $\mathtt{1}$
  - `v_min, v_max`: Random range of values of the clauses
  - `c_min, c_max`: Random range of number of clauses

In [None]:
num_bits = 10
spec = 0.50
v_min, v_max = -7, 7
c_min, c_max = 16, 16

bcf = generate_random_bcf(num_bits, spec, v_min, v_max, c_min, c_max)
n = bcf.num_bits
m = math.ceil(math.log(max([sum(filter(lambda x: x>0, bcf.get_values())) + 1, -sum(filter(lambda x: x<0, bcf.get_values()))]), 2)) + 2

print("n = ", n, ", m = ", m, sep = "")
print("bcf: ")
print(bcf)

This time, we will not reset the number of $G$ applications $r$ back to $1$ when finding a better string. Instead, every time we find a better string, we will leave $r$ untouched and run Grover Search again with the updated threshold. The stop condition will be the same: Stop when running Grover Search with $r = r_{max}$ does not output a better string. We will precalculate the best values of $r$ and only use these instead of trying all values from $0$ to $r_{max}$.

In [None]:
def grover_succ_prob(n, tg, r):
    return np.sin(((2.0*r) + 1.0)* np.arcsin(np.sqrt(tg/(2**n))))**2
    
def grover_rmax(n):
    return np.int(np.floor(np.pi / (4.0*np.arcsin(2.0**(-n/2.0)))))

def grover_opt_iters(n, tg):
    return np.int(np.floor(np.pi / (4.0*np.arcsin(np.sqrt(tg/(2**n))))))

def best_grover_r_list(n):
    r_max = grover_rmax(n)
    
    opt_iters = [grover_opt_iters(n, 2), grover_opt_iters(n, 1)]
    tg = 3
    
    while opt_iters[0] < opt_iters[1]:
        opt_iters.insert(0, grover_opt_iters(n, tg))
        tg += 1
                
    last_opt_iters = opt_iters[0]
    return list(range(last_opt_iters)) + opt_iters[1:]

In [None]:
def GAS_run(n, m, bcf, verbose = False):    
    #Precalculate best r values
    list_r = best_grover_r_list(n)
    j_max = len(list_r) - 1

    #Best strings and values: First are chosen randomly
    best_strs = [dec_to_bin(getrandbits(n), n)]
    best_vals = [bcf.evaluate(best_strs[-1])]
    read_strs = [best_strs[-1]]
    read_vals = [best_vals[-1]]
    r_appls = []

    if verbose:
        print(("Initial conditions: x={0:s}, y={1:4d}").format(
            best_strs[-1],
            best_vals[-1]
        ))
        print("Iterations with r=" + str(list_r) + "\n")

    j = 0
    num_iters = 1
    n_shots = 100000
    while j <= j_max:
        r = list_r[j]
        r_best = None
        tg = bcf.num_clauses_less(best_vals[-1])
        if tg != 0:
            r_best = grover_opt_iters(n, tg)
            if verbose:
                print(("Iteration {0:4d}:\n   Iteration params: r, r_best, tg = {1:3d}, {2:3d}, {3:4d}/{4:4d}").format(
                    num_iters,
                    r,
                    r_best,
                    tg,
                    2**n
                ))
                print(("   Theoretical success probability: {0:2.2f}%").format(
                    100*round(grover_succ_prob(n, tg, r), 4)
                ))
        else:
            if verbose: print("END: Best string found")
            return best_strs, best_vals, read_strs, read_vals, r_appls, sum(list_r[j:]), True
    
        #Create and run GAS quantum circuit
        # Threshold: Last best value
        qc = QuantumCircuit()
    
        qr_strs = QuantumRegister(n, "strs")
        qr_vals = QuantumRegister(m, "vals")
        qc.add_register(qr_strs)
        qc.add_register(qr_vals)
        
        cr_strs = ClassicalRegister(n, "cl_strs")
        cr_vals = ClassicalRegister(m, "cl_vals")
        qc.add_register(cr_strs)
        qc.add_register(cr_vals)
    
        qc.append(GAS_circuit(m, bcf, best_vals[-1], r).to_gate(), qr_strs[:] + qr_vals[:])
        qc.measure(qr_strs, cr_strs)
        qc.measure(qr_vals, cr_vals)
        
        job = execute(qc, Aer.get_backend("qasm_simulator"), shots = n_shots)
        counts = job.result().get_counts()
        r_appls.append(r)
        #ideal_r_appls.append(r_best)
        
        #Calculate empirical success probability
        good_n_shots = sum(map(lambda kv: kv[1], filter(lambda kv: kv[0][0] == "1", counts.items())))
        if verbose: print(("   Empirical success probability:   {0:2.2f}%").format(
            100*round(float(good_n_shots)/n_shots, 4)
        ))
        
        #Retrieve one random result and get read_str and read_val
        results = choices(list(counts.keys()), list(counts.values()))[0].split(" ")
        #results = list(counts.keys())[0].split(" ")
        read_str = results[1]
        read_val_bin = results[0]
        read_val = bin_ca2_to_dec(results[0])
        read_corr_val = read_val + best_vals[-1]
        read_strs.append(read_str)
        read_vals.append(read_val + best_vals[-1])

        if verbose: print(("   Read results: x, y = {0:s}, {2:4d} ({1:s})").format(
            read_str,
            read_val_bin,
            read_corr_val
        ))

        #If read_corr_val is better (smaller) than best_vals[-1]: Update threshold and reset r to 1
        #If not: Do not update threshold, increase r by 1
        if read_corr_val < best_vals[-1]:
            best_strs.append(read_str)
            best_vals.append(read_corr_val)
            if verbose:
                print("     Success", end = "")
                if r != r_best: print("!!!", end = "")
                print(" -> New threshold = {0:4d}\n".format(
                    best_vals[-1]
                ))
        else:
            best_strs.append(best_strs[-1])
            best_vals.append(best_vals[-1])
            j += 1
            if verbose:
                print("     Failure", end = "")
                if r == r_best: print("!!!\n")
                else: print("\n")
    
        #Update number of iterations
        num_iters += 1
    
    if verbose: print("END: All r values tried, best string not found")
    return best_strs, best_vals, read_strs, read_vals, r_appls, 0, False

Execute the next cell to run the test. <span style="color:red">**Warning:**</span> The tests may take a few minutes to complete (up to **60 mins** for standard params).

A "!!!" denotes an unexpected result (Success or Failure when the opposite was expected). An unexpected Failure implies that the algorithm has a high chance of outputting a non-optimal string. Unexpected Successes do not affect the chances of optimal string outputting.

We will stop the execution if we output the best string to save time. In a real Grover Adaptive Search we would not stop as we would have no way of verifying the string's optimality. Nevertheless, we add to the total the $G$ applications used if we had nod stopped.

To make sure the algorithm succeeded, we exhaustively evaluate the binary cost function and compare with the algorithm's output.

In [None]:
best_strs, best_vals, read_strs, read_vals, r_appls, extra_r_appls, is_successful_run = GAS_run(n, m, bcf, verbose = True)

best_str, best_val = bcf.min_exhaustively()
print()
if is_successful_run: print("   Status: SUCCESS")
else: print("   Status: FAILURE")
print("Expected string: {0:s}".format(best_str))
print("Returned string: {0:s}".format(best_strs[-1]))
print("Expected value: {0:d}".format(best_val))
print("Returned value: {0:d}".format(best_vals[-1]))
print("Total G applications: {0:d}".format(sum(r_appls) + extra_r_appls))

In [None]:
def view_GAS_results(best_strs, best_vals, read_strs, read_vals, r_appls, extra_r_appls, is_successful_run):
    width, height = 25, 10
    fig = plt.figure(figsize=(width, height))
    ax = plt.subplot(1, 1, 1)
    num_data = len(best_vals)
    min_y, max_y = bcf.min_exhaustively()[1], max(read_vals)
    x = np.arange(1, num_data+1)
    ax.set_xlim(0.75, num_data + 0.25)
    ax.set_ylim(min_y - 1, max_y + 1)
    ax.axhline(min_y, color = "red", linestyle = "--")
    ax.plot(x, best_vals, color = "#4d94ff")
    ax.scatter(x, best_vals, s = 750, color = "#4d94ff", label = "Best solution")
    ax.scatter(x, read_vals, s = 150, color = "#e65c00", label = "Read solution")
    ax.legend(loc='upper center', bbox_to_anchor=(0.5, -0.05), fontsize = "xx-large")
    ax.set_title("Read and best solutions", fontsize = "xx-large")
    ax.grid()
    d = 0
    w = width / (num_data - 0.5)
    h = height / (max_y - min_y + 2)
    r = 0.5
    for i in range(num_data - 1):
        y1, y2 = best_vals[i], best_vals[i+1]
        '''
        if y1 != y2 and r_appls[i] != ideal_r_appls[i]:
            ax.add_patch(Ellipse(
                (i + 1.5, ((y1 + y2) / 2)),
                width = r / w,
                height = r / h,
                color = "green",
                alpha = 0.3,
                linewidth = 1
            ))
        if y1 == y2 and r_appls[i] == ideal_r_appls[i]:
            ax.add_patch(Ellipse(
                (i + 1.5, ((y1 + y2) / 2)),
                width = r / w,
                height = r / h,
                color = "red",
                alpha = 0.3,
                linewidth = 1
            ))
        '''
        if y1 != y2:
            ax.add_patch(Ellipse(
                (i + 1.5, ((y1 + y2) / 2)),
                width = r / w,
                height = r / h,
                color = "black",
                fill = False,
                linewidth = 1
            ))
        ax.text(
            i + 1.5,
            ((y1 + y2) / 2),
            str(r_appls[i]),
            verticalalignment = "center",
            horizontalalignment = "center",
            fontsize = 20
        )
    plt.show()

Execute the next cell to visualize the execution process.

In [None]:
view_GAS_results(best_strs, best_vals, read_strs, read_vals, r_appls, extra_r_appls, is_successful_run)

### Automatic tests

To estimate the success probability of the algorithm, we run the algorithm many times and save the results on a sepparate file. Files are identified by the `md5` hash of the binary cost function.

In [None]:
def write_results(bcf, best_strs, best_vals, read_strs, read_vals, r_appls, extra_r_appls, is_successful_run):
    results_filename = "GAS_results/results_" + bcf.generate_hash_ID() + ".txt"
    
    if not os.path.isfile(results_filename):
        results_file = open(results_filename, "ab")
        pickle.dump(bcf, results_file)
        results_file.close()
        
    results_file = open(results_filename, "ab")
    pickle.dump({
        "best_strs": best_strs,
        "best_vals": best_vals,
        "read_strs": read_strs,
        "read_vals": read_vals,
        "r_appls": r_appls,
        "extra_r_appls": extra_r_appls,
        "is_successful_run": is_successful_run
    }, results_file)
    results_file.close()
    
def read_bcf_and_results(results_filename):    
    results_file = open(results_filename, "rb")
    results_bcf = pickle.load(results_file)
    results_list = []
    try:
        while True:
            results_list.append(pickle.load(results_file))
    except EOFError:
        pass
    results_file.close()
    return results_bcf, results_list

def read_bcf(results_filename):    
    results_file = open(results_filename, "rb")
    results_bcf = pickle.load(results_file)
    return results_bcf

def view_GAS_read_results(read_results):
    view_GAS_results(
        read_results["best_strs"],
        read_results["best_vals"],
        read_results["read_strs"],
        read_results["read_vals"],
        read_results["r_appls"],
        read_results["extra_r_appls"],
        read_results["is_successful_run"]
    )

Run this cell to generate a new binary cost function.

In [None]:
num_bits = 8
spec = 0.66
v_min, v_max = -10, 10
c_min, c_max = 40, 40

bcf = generate_random_bcf(num_bits, spec, v_min, v_max, c_min, c_max)
n = bcf.num_bits
m = math.ceil(math.log(max([sum(filter(lambda x: x>0, bcf.get_values())) + 1, -sum(filter(lambda x: x<0, bcf.get_values()))]), 2)) + 2

print("n = ", n, ", m = ", m, sep = "")
print("bcf: ")
print(bcf)

Run this cell to run automatic, non-verbose Grover Adaptive Search tests on the random binary cost function and save the results into a file. After each run test, the results are saved into the file, so if the tests are stopped, all the previous run test data will not be lost.

In [None]:
n_runs = 124

print("BCF hash: " + bcf.generate_hash_ID())
results_filename = "GAS_results/results_" + bcf.generate_hash_ID() + ".txt"

for n_run in range(n_runs):
    print("Run {0:d}/{1:d}... ".format(n_run+1, n_runs), end = "")
    best_strs, best_vals, read_strs, read_vals, r_appls, extra_r_appls, is_successful_run = GAS_run(n, m, bcf, verbose = False)
    write_results(
        bcf,
        best_strs,
        best_vals,
        read_strs,
        read_vals,
        r_appls, 
        extra_r_appls,
        is_successful_run
    )
    print("Done ", end = "")
    if is_successful_run: print("(Success)")
    else: print("(Failure)")

Alternatively, provide the `md5` hash of the binary cost function that appears on a results file and run the following cell to recover the binary cost function definition and run more tests to save on the same file.

In [None]:
#md5_bcf = "41f3748b586b6e336e6f1dd5e2d621f3"
#md5_bcf = "f41e67fdf60afdb63f7713d8c8a8853d"

n_runs = 109

results_filename = "GAS_results/results_" + md5_bcf + ".txt"
bcf = read_bcf(results_filename)
print(bcf)
print("BCF hash: " + bcf.generate_hash_ID())
n = bcf.num_bits
m = math.ceil(math.log(max([sum(filter(lambda x: x>0, bcf.get_values())) + 1, -sum(filter(lambda x: x<0, bcf.get_values()))]), 2)) + 2

for n_run in range(n_runs):
    print("Run {0:d}/{1:d}... ".format(n_run+1, n_runs), end = "")
    best_strs, best_vals, read_strs, read_vals, r_appls, extra_r_appls, is_successful_run = GAS_run(n, m, bcf, verbose = False)
    write_results(
        bcf,
        best_strs,
        best_vals,
        read_strs,
        read_vals,
        r_appls, 
        extra_r_appls,
        is_successful_run
    )
    print("Done ", end = "")
    if is_successful_run: print("(Success)")
    else: print("(Failure)")

Provide an `md5` hash of the binary cost function that appears on a results file and run the following cell to output the estimated the success probability using the results saved on the file.

In [None]:
md5_bcf = "41f3748b586b6e336e6f1dd5e2d621f3"
#md5_bcf = "f41e67fdf60afdb63f7713d8c8a8853d"

results_filename = "GAS_results/results_" + md5_bcf + ".txt"
print("Results from " + results_filename)
bcf, results_list = read_bcf_and_results(results_filename)
n = bcf.num_bits
m = math.ceil(math.log(max([sum(filter(lambda x: x>0, bcf.get_values())) + 1, -sum(filter(lambda x: x<0, bcf.get_values()))]), 2)) + 2

total_results = 0
total_success = 0
total_G_appls = 0
for results in results_list:
    total_results += 1
    total_success += results["is_successful_run"] == True
    total_G_appls += sum(results["r_appls"]) + results["extra_r_appls"]
    
print("n, m = {0:d}, {1:d}".format(n, m))
print(bcf)
print("   Number of tests: {0:d}".format(total_results))
print("      Success rate: {0:2.2f}%".format(100*round(float(total_success)/total_results, 4)))
print("Avg G applications: {0:f}".format(float(total_G_appls)/total_results))

After loading the results with the previous cell, run this cell to visualize the executions of some of the read tests.

In [None]:
for results in choices(results_list, k = 10):
    view_GAS_read_results(results)

The tests can be performed with any binary cost function, as the number of qubits, maximum rotations, and other parameters are set automatically for any chosen binary cost function.

One should note, though, that simulation times scale exponentially with the number of qubits, meaning that the more complex the cost function, the much longer simulations will take to complete. Also, one should note that Qiskit's **OpenQASM Simulator**  has a hard cap of 30 qubits for simulations.