# Dynex' Q分数（Q-Score）

Q-分数（Q-score）用于衡量运行代表性量子应用程序的效率，系统处理实际问题的效果，而不是其理论或物理性能。[1]。Dynex神经形态计算平台的Q-分数计算是基于Atos的官方包来测量Q-分数的[2]。

在“评估量子退火器的Q-Score”一文中，Ward van der Schoot等人计算了一系列D-Wave量子设备、经典算法和混合量子-经典系统的Q-Score。公共的Q-Score软件包[2]已被用于生成Dynex Neuromorphic Computing Platform的Q-Score。

In [17]:
import dynex
import dimod
from pyqubo import Array

### Dynex SDK 版本

In [18]:
dynex.__version__

'0.1.10'

In [19]:
import networkx as nx
from collections import defaultdict
import numpy as np
from datetime import datetime
import pickle
from dimod import BinaryQuadraticModel, BINARY
from tqdm.notebook import tqdm

## 导入 Q-Score 包

In [20]:
def draw_sampleset(G, sampleset):
    """
    应用'sampleset'绘制图'G'的函数
    """
    lut = sampleset.first.sample

    # Interpret best result in terms of nodes and edges
    S0 = [node for node in G.nodes if not lut[node]]
    S1 = [node for node in G.nodes if lut[node]]
    cut_edges = [(u, v) for u, v in G.edges if lut[u]!=lut[v]]
    uncut_edges = [(u, v) for u, v in G.edges if lut[u]==lut[v]]

    # Display best result
    pos = nx.spring_layout(G)
    nx.draw_networkx_nodes(G, pos, nodelist=S0, node_color='r')
    nx.draw_networkx_nodes(G, pos, nodelist=S1, node_color='c')
    nx.draw_networkx_edges(G, pos, edgelist=cut_edges, style='dashdot', alpha=0.5, width=3)
    nx.draw_networkx_edges(G, pos, edgelist=uncut_edges, style='solid', width=3)
    nx.draw_networkx_labels(G, pos)

In [21]:
def run_job(size, depth = 1, seed=None, plot=False, strength = 10, generate_random = False, debug = False):
    """
    （重新）实现了Atos的generate_maxcut_job()函数，该函数在job_generation.py中指定。它生成一个给定大小的随机Erdos-Enyi图，将图转换为QUBO公式，返回两组以及切割的数量（=能量基态），以与论文一致。

    参数：
    -----------
    - size (int): 最大切割问题图的大小
    - depth (int): 问题的深度
    - seed (int): 随机种子
    - plot (boolean): 绘制图形
    - strength (int): qubo公式边的权重
    - generate_random (boolean): 随机分配的实现，可以替代论文中的0.178 * pow(size, 3 / 2)。
    
    Returns:
    --------
    - Set 0 (list)
    - Set 1 (list)
    - maximum_cut result (int)
    - random_cut result (int)
    """
    # 创建一个给定大小的Erdos-Renyi图：
    G = nx.generators.erdos_renyi_graph(size, 0.5, seed=seed);
    if debug:
        print('Graph generated. Now constructing Binary Quadratic Model...')
    if plot:
        nx.draw(G);
    
    # 将图转换为Qubo：
    #Q = defaultdict(int)
    # Update Q matrix for every edge in the graph
    #for i, j in G.edges:
    #    Q[(i,i)]+= -1 * strength;
    #    Q[(j,j)]+= -1 * strength;
    #    Q[(i,j)]+=  2 * strength;
    #bqm = dimod.BinaryQuadraticModel.from_qubo(Q, 0.0);
    
    # 我们直接构建一个二进制二次模型（更快）：
    _bqm = BinaryQuadraticModel.empty(vartype=BINARY);
    for i, j in tqdm(G.edges):
        _bqm.add_linear(i, -1 * strength);
        _bqm.add_linear(j, -1 * strength);
        _bqm.add_quadratic(i,j, 2 * strength);
        
    if debug:
        print('BQM generated. Starting sampling...');
    
    # 在Dynex平台上进行采样：
    model = dynex.BQM(_bqm, logging=False);
    sampler = dynex.DynexSampler(model, mainnet=False, description='Dynex SDK test', logging=False);
    sampleset = sampler.sample(num_reads=500000, annealing_time = 300, debugging=False);
    cut = (sampleset.first.energy * -1 ) / strength;
    print('Ground state cut = ',cut);
    
    # 随机 cut?
    r_cut = -1;
    if generate_random:
        random_assignment = list(np.random.randint(0, 2, size))
        r_assignment = dimod.SampleSet.from_samples_bqm(random_assignment, _bqm)
        r_cut = (r_assignment.first.energy * -1 ) / strength;
    
    if plot:
        draw_sampleset(G, sampleset)
    
    return cut, r_cut
    

In [22]:
_NB_INSTANCES_PER_SIZE = 5 #100
_INITIAL_SIZE = 5
_DEFAULT_SIZE_LIMIT = 20
_DEFAULT_DEPTH = 1
_DEFAULT_OUT_FILE = "out.csv"
_DEFAULT_RAW_FILE = "out.raw"
beta = 0.2
seed = 1234

In [23]:
_INTRO = """=== Running Q-score benchmark | {date} ===
Instances size:    {init_size} -- {final_size}
Beta:              {beta}
Ansatz depth:      {depth}
Output file:       {output}
Raw output file:   {rawdata}
Random seed:       {seed}
================================="""

_HEADER = """# Q-Score run | {date}
# Instances size:    {init_size} -- {final_size}
# Ansatz depth:      {depth}
# Beta:              {beta}
# Output file:       {output}
# Raw output file:   {rawdata}
# Random seed:       {seed}
# size, avg. score, avg. random score
"""

In [24]:
def _exhaustive(start_size, end_size):
    """
    迭代遍历域的所有值，直到找到一个负值。
    """
    values = dict()
    for index in range(start_size, end_size + 1, 10):
        value = yield index
        values[index] = value
        if value < 0:
            if index == start_size:
                return False, value, (False, start_size)
            return True, values, index - 1
    return False, values, (True, max(values), values[max(values)])


def _dichotomic(start_size, end_size):
    """"""
    lower = start_size
    upper = end_size
    value = yield lower
    values = dict()
    values[lower] = value
    value = yield upper
    values[upper] = value

    if values[upper] > 0:
        return False, values, (True, max(values), values[max(values)])
    if values[lower] < 0:
        return False, value, (False, start_size)
    while True:
        if abs(upper - lower) <= 1:
            return True, values, lower
        next_index = (upper + lower) // 2
        values[next_index] = yield next_index
        if values[next_index] < 0:
            upper = next_index
        else:
            lower = next_index


GENERATORS = {"exhaustive": _exhaustive, "dichotomic": _dichotomic}

In [25]:
class Driver:
    """
    驱动与迭代器的交互。

    论点:
        fun(callable): 评估函数，它应该接受一个索引并返回一个分数。
        iteration(str): 可选值为 "exhaustive" 或 "dichotomic"。
        start_size(int): 起始大小（即最低索引）。
        end_size(int): 结束大小（即最高索引）。
    """

    def __init__(self, fun, iteration, start_size, end_size):
        if iteration not in GENERATORS:
            raise ValueError(f"Unknown iteration method {iteration}")
        self.generator = GENERATORS[iteration](start_size, end_size)
        self.fun = fun

    def run(self):
        """
        运行迭代并返回一个元组，其中包含：
        成功状态（True，如果存在一个索引，使得 f(index) > 0 且 f(index + 1) <= 0，否则为 False）
        一个包含所有评估点的映射 <index, value>
        如果找到，索引，使得 f(index) > 0 且 f(index + 1) <= 0
        """
        index = next(self.generator)
        while True:
            try:
                index = self.generator.send(self.fun(index))
            except StopIteration as exp:
                return exp.value

In [26]:
class QScore:
    # pylint: disable=too-many-instance-attributes
    """

    论点:
        qpu (:class:~qat.core.qpu.QPUHandler): 用于基准测试的 QPU（包括其编译栈）。QPU 应支持变分优化。
        initial_size (int, 可选): 尝试的初始实例大小。默认为 5。
        size_limit (int, 可选): 试图解决的 MAX-CUT 实例大小限制。实例大小将在 5 到此限制之间变化。默认为 20。
        beta (float, 可选): 测试的阈值比率。官方测试使用 20%（0.2）作为阈值。默认为 0.2。
        iterator (str, 可选): 要使用的迭代方法（"exhaustive" 或 "dichotomic"）。默认为 "dichotomic"。
        depth (int, 可选): 要使用的 QAOA 深度。默认为 1。
        output (str, 可选): 用于存储基准测试输出的文件名（以 CSV 格式）。默认为 out.csv。
        rawdata (str, 可选): 用于存储基准测试期间执行的所有运行的原始输出的文件名。默认为 out.raw。
        seed (int, 可选): 用于生成实例的种子。
    """

    def __init__(
        self,
        qpu,
        size_limit=_DEFAULT_SIZE_LIMIT,
        initial_size=_INITIAL_SIZE,
        beta=0.2,
        iterator= "exhaustive", # "dichotomic",
        depth=_DEFAULT_DEPTH,
        output=_DEFAULT_OUT_FILE,
        rawdata=_DEFAULT_RAW_FILE,
        seed=None,
    ):
        self._executor = qpu
        self._size_limit = size_limit
        self._iterator = iterator
        self._initial_size = initial_size
        self._depth = depth
        self._output = output
        self._rawdata = rawdata
        self._seed = seed if seed is not None else np.random.randint(100000)
        self._beta = beta

    def run(self):
        """
        运行基准测试。
        """
        date_string = datetime.now().strftime("%d/%m/%Y %H:%M:%S")
        print(
            _INTRO.format(
                date=date_string,
                init_size=self._initial_size,
                final_size=self._size_limit,
                beta=self._beta,
                depth=self._depth,
                output=self._output,
                rawdata=self._rawdata,
                seed=self._seed,
            )
        )
        all_data = {}
        seed = self._seed
        to_output = _HEADER.format(
            date=date_string,
            init_size=self._initial_size,
            beta=self._beta,
            final_size=self._size_limit,
            depth=self._depth,
            output=self._output,
            rawdata=self._rawdata,
            seed=self._seed,
        )

        def _evaluate_point(size, seed=seed, to_output=to_output, self=self):
            """
            计算单点的函数。
            """
            print(f"Running for n={size:2d}.", end=" ", flush=True)
            scores = []
            data = []
            for _ in range(_NB_INSTANCES_PER_SIZE):
                
                #job = generate_maxcut_job(size, self._depth, seed=seed)
                #result = self._executor.submit(job)
                #result = -cut; # cut is already positive
                #scores.append(-result.value)
                #data.append({"seed": seed, "score": -result.value})
                #seed += 1
                
                # 以上内容已被 Dynex 采样函数 run_job 替换。
                cut, r_cut = run_job(size, depth = self._depth, seed=None, plot=False, strength = 1000);
                scores.append(cut);
                data.append({"seed": seed, "score": cut})
                seed += 1
                #print('    cut: ', cut)
                
            average_score = np.mean(scores) - size * (size - 1) / 8
            avg_best_score = 0.178 * pow(size, 3 / 2)
            print(f"Score: {average_score:.2f}.", end=" ")
            print(f"Random best score: {avg_best_score:.2f}.", end="\t")
            to_output = f"{size},{average_score},{avg_best_score}\n"
            all_data[size] = data
            pickle.dump(all_data, open(self._rawdata, "wb"))
            with open(self._output, "a") as fout:
                fout.write(to_output)
            achieved_ratio = average_score / avg_best_score
            if achieved_ratio > self._beta:
                print("Success.", "beta = ",achieved_ratio)
            else:
                print("Fail.", "beta = ", achieved_ratio)
            return achieved_ratio - self._beta

        success, _, info = Driver(_evaluate_point, self._iterator, self._initial_size, self._size_limit).run()
        
        print('   ', info)

        if success:
            print(f"Success. QScore({self._beta}) = {info}")
        else:
            if info[0]:
                print(f"Failure. QScore({self._beta}) > {info[1]}")
                print("Maybe try to increase the max instance size !")
            else:
                print(f"Failure. QScore({self._beta}) < {info[1]}")


# 运行 Q-Score 

根据论文，Q-Score 被定义为当 beta 低于 0.2 时的大小 'N'。要评估这一点，需要迭代地尝试从 5 到 x 的各种大小。所有运行的结果都存储在 "out.csv" 中，也可以用于绘制类似 [1] 中呈现的图表。运行将在达到 beta 低于阈值的情况下返回 'N' 的成功。

In [None]:
QScore(None, size_limit = 180, depth = 1, output = 'out.csv', rawdata = 'out.raw', seed = 1234).run()

=== Running Q-score benchmark | 04/11/2023 23:26:04 ===
Instances size:    5 -- 180
Beta:              0.2
Ansatz depth:      1
Output file:       out.csv
Raw output file:   out.raw
Random seed:       1234
Running for n= 5. 

  0%|          | 0/6 [00:00<?, ?it/s]

# 绘制结果

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
df = pd.read_csv('out.csv', names = ["n","score","random_score"], header = None)
df['beta'] = df['score'] / df['random_score']

In [None]:
plt.figure(figsize=(8,6))
plt.bar(df.n,0.05, width=1, bottom=df.beta-0.025, color='#000000')
plt.plot(df['n'], df['beta'], linestyle='None', marker='o')
plt.title('Dynex Neuromorphic Platform')
plt.xlabel('Problem size N')
plt.ylabel('Beta')
plt.axhline(y = 0.2, color = 'r', linestyle = '--') 
plt.tight_layout()
plt.show()

## 参考文献

[1] Martiel S, Ayral T, Allouche C. 在以应用为中心、硬件无关和可扩展的方式中对量子协处理器进行基准测试。IEEE量子工程交易。2021年6月17日;2:1-1。

[2] Atos用于计算Atos Q-Score的软件包：
https://github.com/myQLM/qscore/blob/master/qat/qscore/https://github.com/myQLM/qscore/blob/master/qat/qscore/