# Extend Janus-CT To Identify Bugs in the Quantum Circuit

**Author:** Congliang Lang \& Siwei Tan  

**Date:** 15/4/2024

Based on "[QuCT: A Framework for Analyzing Quantum Circuit by Extracting Contextual and Topological Features (MICRO 2023][1]"

[1]: https://scholar.google.com/scholar_url?url=https://dl.acm.org/doi/abs/10.1145/3613424.3614274%3Fcasa_token%3DffjIB1hQ4ZwAAAAA:8MajDLrDOC74WoeMf7r7AoQ-koxCa4E1TNqQg3GSDz03xUX6XdE3toNTM-YdM_e4rKEusMceJ6BGJg&hl=zh-CN&sa=T&oi=gsb&ct=res&cd=0&d=11146218754516883150&ei=42YSZpPlFL6s6rQPtt6x6Ac&scisig=AFWwaeYaiu2hyx8HUJ_7Buf9Mwom

The vectorization of Janus-CT can be extended to more downstream tasks. For example, in this notebook, we use Janus-CT to identify the potential bugs in the quantum algorithm implementation. We apply a data driven method that traing a model to predict the error rate 

In [1]:
import sys
sys.path.append('..')
import os
os.chdir("..")
import logging
logging.basicConfig(level=logging.WARN)
import ray
ray.init(log_to_driver=False)

from janusq.data_objects.algorithms import get_algorithm_circuits
import random
import seaborn as sns
import numpy as np
from collections import defaultdict
import pandas as pd
import jax.numpy as jnp

from janusq.analysis.vectorization import RandomwalkModel

from janusq.tools.ray_func import map
from janusq.data_objects.backend import LinearBackend
import copy
import statistics
from janusq.data_objects.circuit import Circuit

2024-04-16 11:28:22,686	INFO worker.py:1724 -- Started a local Ray instance.


In [2]:
from collections import Counter

class BugIdentificationModel:
    def __init__(self, vec_model: RandomwalkModel) -> None:
        self.vec_model = vec_model
    
    def train(self, algorithm_to_circuirts: dict[str, list[Circuit]]):
        self.total_vecs = []
        self.functionalities = []
        
        for algorithm, circuits in algorithm_to_circuirts.items():
            for circuit in circuits:
                vecs = self.vec_model.vectorize(circuit)
                self.total_vecs += vecs
                self.functionalities += [algorithm] * len(vecs)

    def identify_bug(self, circuit, top_k = 3):
        gate_vecs = self.vec_model.vectorize(circuit)
        
        functionalities_per_gate = []
        all_functionalities = []
        for analyzed_vec in gate_vecs:
            dists = np.sqrt(np.sum((self.total_vecs - analyzed_vec)**2, axis=1))
            
            top_dist_indices = np.argsort(dists)[:top_k]
            nearest_functionalities = self.functionalities[top_dist_indices]
            # nearest_dists = dists[top_dist_indices]
            
            functionalities_per_gate.append(nearest_functionalities)
            all_functionalities += nearest_functionalities
        
        top_functionalities = Counter(all_functionalities).most_common(top_k)
        
        error_gate_indices = []
        for i, possible_functionalities in enumerate(functionalities_per_gate):
            if any([functionality in top_functionalities for functionality in possible_functionalities]):
                continue
            error_gate_indices.append(i)
        
        return error_gate_indices



In [3]:
# def scan(n_qubits, n_steps, n_walks, n_bugs):
#     alg2success_num, alg2bug_num, alg2error_num, alg_to_n_identified = defaultdict(int), defaultdict(int),defaultdict(int),defaultdict(int)


                    
#         alg_to_n_identified[alg_name] += 1

#         alg2bug_num[alg_name] += len(bug_gate_ids)
#         for bug_gate_idx in bug_gate_ids:
#             if alg_name not in [algorithms[i] for i in gate_to_nearest_circuits[bug_gate_idx]]:
#                 alg2success_num[alg_name] += 1

    
#     def bug_detect(alg_name, n_qubits):
#         circuit_id = f'{alg_name}_{n_qubits}'

#         current_circuit = id2circuit[circuit_id]
        
#         bug_circuit, bug_gate_ids = construct_negatives(current_circuit, n_bugs = n_bugs) 

#         find_bug(bug_circuit, bug_gate_ids)


#     for alg_name in algorithms:
#         bug_detect(alg_name, n_qubits)


#     print('bug_size', n_bugs)
#     print('alg2success_num=', alg2success_num)
#     print('alg2bug_num=', alg2bug_num)
#     print('alg2error_num=', alg2error_num)
#     print('alg2identify_num=', alg_to_n_identified)
    
#     return alg2success_num, alg2bug_num, alg2error_num, alg_to_n_identified
    
# @ray.remote(max_calls=6)
# def scan_remote(n_qubits, n_steps, n_walks, n_bugs):
#     return scan(n_qubits, n_steps, n_walks, n_bugs)

In [4]:
def construct_negatives(circuit, n_bugs, basis_gates):
    
    n_qubits = circuit.n_qubits
    bug_circuit = copy.deepcopy(circuit)
    
    

    bug_start = random.randint(0, max(circuit.n_gates - 1 - n_bugs, 1))
    bug_end = bug_start + n_bugs
    bug_gate_ids = list(range(bug_start, min(bug_end, circuit.n_gates)))

    for bug_gate_id in bug_gate_ids:
        
        gate = bug_circuit.gates[bug_gate_id]

        name = random.choice(basis_gates) # ['rx', 'ry', 'rz', 'h', 'cz', 'cx']

        params = np.random.random((3,1)) * 2 * np.pi
        params = params.tolist()
        
        qubit1 = random.randint(0, n_qubits - 1)
        qubit2 = random.choice([qubit for qubit in range(n_qubits) if qubit != qubit1])
        qubits = [qubit1, qubit2]
        
        gate['name'] = name
        if name in ('rx', 'ry', 'rz'):
            gate['qubits'] = qubits[:1]
            gate['params'] = params[:1]
            
        elif name in ('cz', 'cx'):
            gate['qubits'] = qubits
            gate['params'] = []
            
        elif name in ('h'):
            gate['qubits'] = qubits[:1]
            gate['params'] = []
            
        elif name in ('u'):
            gate['qubits'] = qubits[:1]
            gate['params'] = params
            
        else:
            logging.error("no such gate")
            return circuit

    bug_circuit.name = bug_circuit.name
    return bug_circuit, bug_gate_ids

In [5]:
algorithm_to_circuirts = ['qft', 'hs', 'ising', 'qknn', 'qsvm', 'vqc', 'ghz', 'grover']
algorithm_to_circuirts = defaultdict(list)
algorithm_circuits = []

backend = LinearBackend(10)
vec_model = RandomwalkModel(n_steps = 3, n_walks = 30, backend = backend)

for n_qubits in range(5, backend.n_qubits):
    for algorithm, circuit in zip(algorithm_to_circuirts, get_algorithm_circuits(n_qubits, backend, algorithm_to_circuirts)):
        algorithm_to_circuirts[algorithm].append(circuit)
        algorithm_circuits.append(circuit)

vec_model.train(algorithm_circuits)

0it [00:00, ?it/s]


In [6]:
bug_indentify_model = BugIdentificationModel(vec_model)
bug_indentify_model.train(algorithm_to_circuirts)

In [7]:
circuit = random.choice(algorithm_circuits)
bug_circuit, bug_gate_ids = construct_negatives(circuit, n_bugs=4, basis_gates= backend.basis_gates)
predict_indices = bug_indentify_model.identify_bug(bug_circuit)

correct_rate = 0
for predict_indice in predict_indices:
    if predict_indice in bug_gate_ids:
        correct_rate+=1

print(str.format("correct_rate: {}"),  correct_rate * 100 / len(bug_gate_ids))

IndexError: list index out of range

In [None]:
results = {}
qubit_range = list(range(5,8))
for n_qubits in qubit_range:
    for n_steps in range(2, 5):
        n_walks = n_steps * 16
        for n_bugs in range(1, 4):
            key = (n_qubits, n_steps, n_walks, n_bugs)
            
            future = scan_remote.remote(*key)
            results[key] = future

In [None]:
for key, future in results.items():
    result = ray.get(results[key])
    results[key] = result

In [None]:
results

In [None]:
alg_to_n_qubit_to_n_success = defaultdict(lambda: defaultdict(int))
alg_to_n_qubit_to_n_bugs = defaultdict(lambda: defaultdict(int))


for key, result in results.items():
    n_qubits, n_steps, n_walks, n_bugs = key

    for alg_name in algorithm_to_circuirts:
        alg_name = f'{alg_name}_{n_qubits}'
        alg_to_n_qubit_to_n_success[alg_name][n_qubits] += result[0][alg_name]

    for alg_name in algorithm_to_circuirts:
        alg_name = f'{alg_name}_{n_qubits}'
        alg_to_n_qubit_to_n_bugs[alg_name][n_qubits] += result[1][alg_name]
            


In [None]:
def combine(dict1, dict2, default_value = 1):
    all_values = []
    new_dict = defaultdict(lambda: defaultdict(lambda: default_value))
    for k1, v1 in dict1.items():
        for k2, v2 in v1.items():
            new_dict[k1][k2] = v2 / dict2[k1][k2]
            all_values.append(v2 / dict2[k1][k2])
    
    return new_dict, np.array(all_values)

In [None]:
alg2n_qubit2success_rate, all_values = combine(alg_to_n_qubit_to_n_success, alg_to_n_qubit_to_n_bugs)
alg2n_qubit2success_rate
# , np.mean(all_values)

In [None]:
data = np.zeros((len(qubit_range) , len(algorithm_to_circuirts)))

for r_i, n_qubits in enumerate(qubit_range):
    for c_i, alg_name in enumerate(algorithm_to_circuirts):
        alg_name = f'{alg_name}_{n_qubits}'
        data[r_i][c_i] = alg2n_qubit2success_rate[alg_name][n_qubits]

df = pd.DataFrame(data, index=qubit_range, columns=algorithm_to_circuirts)
fig = sns.heatmap(data=df,cmap='OrRd')