# Inferring State Leakage Across Reset Gates

Copyright 2022 Allen Mi, Shuwen Deng, and Jakub Szefer

This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>.

## 1 - Imports and Definitions

Import libraries

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import sys
sys.path.append('../')

In [None]:
from itertools import product
import math

import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from scipy.optimize import curve_fit, fsolve

from qiskit import QuantumCircuit, transpile, execute

import scripts.utils as u

Load IBM Q Credentials

In [None]:
provider = u.load_provider('../credentials/provider.json')

Specify saved data and figures directories

In [None]:
data_dir = '../experiments/inference/all_7q_6r_72c_2p'
figures_dir = '../figures/inference/all_7q_6r_72c_2p'

Specify constants
- Total number of qubits
- Width of two-column figures

In [None]:
N_QUBITS = 7
FIG_SINGLE_WIDTH = 714
FIG_DOUBLE_WIDTH = 1500

Specify global variables
- Maximum number of reset operations
- Number of shots to be performed in experiments
- Number of $\theta$ samples in $[0, \pi]$
- Number of $\varphi$ samples in $[0, 2\pi)$
- Number of experiment passes

In [None]:
MAX_N_RESETS = 6
N_SHOTS = 8192
N_THETA = 9
N_PHI = 8
N_PASSES = 2

Specify backend names

In [None]:
be_name_list = ['ibm_perth', 'ibm_lagos', 'ibmq_jakarta']
# be_name_list = ['ibm_perth', 'ibmq_jakarta']

Specify backend proper names

In [None]:
be_proper_name_dict = {
    'ibm_perth': 'Perth',
    'ibm_lagos': 'Lagos',
    'ibmq_jakarta': 'Jakarta'
}

## 2 - Experiments

### 2.1 - Defining Methods

In [None]:
def make_config_list(n_theta, n_phi):
    theta_arr = np.linspace(0, np.pi, n_theta, endpoint=True)
    phi_arr = np.linspace(0, 2 * np.pi, n_phi, endpoint=False)

    return list(product(theta_arr, phi_arr))


def construct_circ(q, theta, phi, measure_first, reset_count):
    c = QuantumCircuit(7, 2)
    c.rx(theta, q)
    c.rz(phi, q)
    if measure_first:
        c.measure(q, 0)
    for _ in range(reset_count):
        c.reset(q)
    c.measure(q, 1)
    
    return c


def make_circuit_dict(config_arr, max_n_resets, n_passes):
    cd = {'meas': [], 'direct': []}

    for q in range(7):
        cd['meas'].append([])
        cd['direct'].append([])
        for r in range(max_n_resets + 1):
            cd['meas'][q].append([
                construct_circ(q, theta, phi, True, r)
                for _, (theta, phi) in product(range(n_passes), config_arr)
            ])
            cd['direct'][q].append([
                construct_circ(q, theta, phi, False, r)
                for _, (theta, phi) in product(range(n_passes), config_arr)
            ])

    return cd


exec = lambda c, be_name, shots: execute(
    transpile(c, backend=provider.get_backend(be_name), optimization_level=0),
    backend=provider.get_backend(be_name), optimization_level=0,
    shots=shots
)


def make_all_jobs(circuit_dict, max_n_resets, n_shots):
    all_jd = {}

    for be_name in be_name_list:
        jd = {'meas': [], 'direct': []}
        for q in range(7):
            jd['meas'].append([])
            jd['direct'].append([])
            for r in range(max_n_resets + 1):
                jd['meas'][q].append(
                    exec(circuit_dict['meas'][q][r], be_name, n_shots)
                )
                jd['direct'][q].append(
                    exec(circuit_dict['direct'][q][r], be_name, n_shots)
                )
        all_jd[be_name] = jd
    
    return all_jd

### 2.2 - Generating Configs and Running Jobs

In [None]:
configs = make_config_list(N_THETA, N_PHI)

In [None]:
circuits = make_circuit_dict(configs, MAX_N_RESETS, N_PASSES)

In [None]:
# jobs = make_all_jobs(circuits, MAX_N_RESETS, N_SHOTS)
# u.save(jobs, f'{data_dir}/jobs.pickle')

In [None]:
jobs = u.load(f'{data_dir}/jobs.pickle')

In [None]:
jobs = {be: jobs[be] for be in be_name_list}

## 3 - Analysis

### 3.1 - Defining Methods

In [None]:
def get_counts(job):
    cnt = job.result().get_counts()
    v1 = [r.get('01', 0) + r.get('11', 0) for r in cnt]
    a1 = [r.get('10', 0) + r.get('11', 0) for r in cnt]
    
    return v1, a1

In [None]:
def make_results(all_jobs, configs, max_n_resets, n_shots, n_passes):
    pre = []
    
    for be_name, x in all_jobs.items():
        for victim_op, y in x.items():
            for i_q, z in enumerate(y):
                for i_r, job in enumerate(z):
                    v1, a1 = get_counts(job)
                    pre.extend([[
                        be_name, i_q,
                        victim_op, i_r,
                        theta, phi,
                        i_pass,
                        v1[i_c], v1[i_c] / n_shots,
                        a1[i_c], a1[i_c] / n_shots
                    ] for i_c, (i_pass, (theta, phi)) in enumerate(
                        product(range(n_passes), configs)
                    )])
    
    return pd.DataFrame(
        pre,
        columns=[
            'backend', 'qubit',
            'victim_op', 'n_resets',
            'theta', 'phi',
            'i_pass',
            'victim_count', 'victim_frequency',
            'attacker_count', 'attacker_frequency'
        ]
    )

In [None]:
def select_results(results, be_name, qubit, n_resets, i_pass, measure_first):
    if measure_first:
        victim_op = 'meas'
    else:
        victim_op = 'direct'
        
    return results[
        (results['backend'] == be_name) &
        (results['qubit'] == qubit) &
        (results['n_resets'] == n_resets) &
        (results['victim_op'] == victim_op) &
        (results['i_pass'] == i_pass)
    ]

In [None]:
def inverse(func):
    l = func(0)
    r = func(np.pi)
    
    if l >= r:
        top = l
        bottom = r
        bias = 0
    else:
        top = r
        bottom = l
        bias = np.pi
    
    def inverse_single(y):
        if y >= top:
            return bias
        elif y < bottom:
            return np.pi - bias
        else:
            return fsolve(lambda theta: func(theta) - y, np.pi / 2)
    
    def inverse_vectorized(y):
        out = np.vectorize(inverse_single)(y)

        if out.size == 1:
            return out.item()
        else:
            return out
    
    return inverse_vectorized

In [None]:
def sigmoid(theta, a, b, c):
    return a * (b * np.sin(theta / 2) ** 2 + (1 - b) / np.pi * theta) + c


def rmspe(y_true, y_pred):
    return np.sqrt((((y_true - y_pred) / y_true) ** 2).mean())


def fit_snr(fit, data, axis=1, ddof=0, to_dB=True):
    data = np.asanyarray(data)
    sd = data.std(axis=axis, ddof=ddof)
    m_snr = np.where(sd == 0, 0, abs(fit[0][0]) / sd).mean()
    
    if to_dB:
        return 20 * np.log10(m_snr)
    else:
        return m_snr


def fit_sigmoid(
    results, be_name, qubit, n_resets, i_pass, measure_first,
    y='attacker_frequency'
):
    x='theta'
    res = select_results(results, be_name, qubit, n_resets, i_pass, measure_first)
    fit =  curve_fit(
        sigmoid, res[x], res[y],
        bounds=((-1, 0, 0), (1, 1, 1))
    )
    func = lambda theta: sigmoid(theta, *fit[0])
    inv = inverse(func)
    
    return fit, func, inv

In [None]:
def compute_metrics(
    results, be_name, qubit, n_resets, i_pass_train, i_pass_test, measure_first,
    y='attacker_frequency'
):
    x='theta'
    thetas = results[x].unique()
    n_thetas = len(thetas)
    n_phis = len(results['phi'].unique())
    
    fit, func, inv = fit_sigmoid(
        results, be_name, qubit, n_resets, i_pass_train, measure_first,
        y=y
    )
    
    res_train = select_results(results, be_name, qubit, n_resets, i_pass_train, measure_first)
    res_test = select_results(results, be_name, qubit, n_resets, i_pass_test, measure_first)
    
    mat_train = res_train[y].to_numpy().reshape(n_thetas, n_phis)
    mat_test = res_test[y].to_numpy().reshape(n_thetas, n_phis)
    
    rmspe_train = rmspe(res_train[y], func(res_train[x]))
    rmspe_test = rmspe(res_test[y], func(res_test[x]))
    
    snr_train = fit_snr(fit, mat_train)
    snr_test = fit_snr(fit, mat_test)
    
    acc_train = []
    acc_test = []
    
    loss_train = np.zeros(n_thetas)
    loss_test = np.zeros(n_thetas)
    
    for mat, acc, loss in [
        (mat_train, acc_train, loss_train),
        (mat_test, acc_test, loss_test)
    ]: 
        for i, r in enumerate(mat):
            theta_true = thetas[i]
            for y_val in r:
                theta_pred = inv(y_val)
                loss[i] += (theta_true - theta_pred) ** 2
                if i in [0, n_thetas - 1]:
                    bin_true = (i == 0)
                    bin_pred = (theta_pred <= 0.5 * np.pi)
                    acc.append(bin_pred == bin_true)
    
    acc_train = np.sum(acc_train) / len(acc_train)
    acc_test = np.sum(acc_test) / len(acc_test)
    
    loss_train = np.sqrt(loss_train / n_phis)
    loss_test = np.sqrt(loss_test / n_phis)
    
    return (
        (fit, func, inv),
        (rmspe_train, rmspe_test),
        (snr_train, snr_test),
        (acc_train, acc_test),
        (loss_train, loss_test)
    )


def summarize_metrics(
    results, be_name, measure_first,
    i_pass_train=1, i_pass_test=0,
    y='attacker_frequency'
):
    snr_data = []
    acc_data = []
    loss_data = []
    
    for qubit, n_resets in product(range(N_QUBITS), results['n_resets'].unique()):
        _, _, snr, acc, loss = compute_metrics(
            results, be_name, qubit, n_resets, i_pass_train, i_pass_test, measure_first,
            y=y
        )
        
        snr_data.append([qubit, n_resets, 'a_train', snr[0]])
        snr_data.append([qubit, n_resets, 'b_test', snr[1]])
        
        acc_data.append([qubit, n_resets, 'a_train', acc[0]])
        acc_data.append([qubit, n_resets, 'b_test', acc[1]])
                        
        for i, theta in enumerate(results['theta'].unique()):
            loss_data.append([
                qubit, n_resets, theta, 'a_train', loss[0][i]
            ])
            loss_data.append([
                qubit, n_resets, theta, 'b_test', loss[1][i]
            ])
        
    snr_df = pd.DataFrame(
        snr_data,
        columns=[
            'qubit', 'n_resets', 'category', 'snr'
        ]
    )
    acc_df = pd.DataFrame(
        acc_data,
        columns=[
            'qubit', 'n_resets', 'category', 'acc'
        ]
    )
    loss_df = pd.DataFrame(
        loss_data,
        columns=[
            'qubit', 'n_resets', 'theta', 'category', 'loss'
        ]
    )
    
    return snr_df, acc_df, loss_df


def mean_acc(
    results, be_name, measure_first, exclude_qubits=[]
):
    acc_df = summarize_metrics(results, be_name, measure_first)[1]
    sub_df = acc_df[
        (~acc_df['qubit'].isin(exclude_qubits))
    ]
    return sub_df.groupby(['n_resets', 'category'], as_index=False)['acc'].mean()

In [None]:
def plot_freq(results, be_proper_name_dict, be_name, i_pass, measure_first, qubit_range=range(N_QUBITS)):
    max_n_resets = results['n_resets'].max()
    
    fig_rows = len(qubit_range)
    fig_cols = int(max_n_resets + 2)
    
    xs = np.linspace(0, np.pi, 1000)
    fig_list = []
    curve_list = []
    
    for qubit in qubit_range:
        if measure_first:
            y_victim = 'victim_frequency'
            color_kwargs = {}
        else:
            y_victim = 'attacker_frequency'
            color_kwargs = dict(color_discrete_sequence=['#ef553b'])
            
        fig_list.append(
            px.box(
                select_results(results, be_name, qubit, 0, i_pass, measure_first),
                x='theta', y=y_victim, **color_kwargs
            )
        )

        fig_list.extend([
            px.box(
                select_results(results, be_name, qubit, n_resets, i_pass, measure_first),
                x='theta', y='attacker_frequency'
            ) for n_resets in results['n_resets'].unique()
        ])
        
        curve_list.append(
            px.line(
                x=xs,
                y=fit_sigmoid(
                    results, be_name, qubit, 0, i_pass, measure_first, y=y_victim
                )[1](xs)
            )
        )
        
        curve_list.extend([
            px.line(
                x=xs,
                y=fit_sigmoid(
                    results, be_name, qubit, n_resets, i_pass, measure_first, y='attacker_frequency'
                )[1](xs)
            ) for n_resets in results['n_resets'].unique()
        ])
        
    
    for i, fig_sub in enumerate(fig_list):
        fig_sub.data[0]['boxmean'] = True
    
    for i, curve_sub in enumerate(curve_list):
        curve_sub.update_traces(
            line_color=fig_list[i].data[0]['marker']['color'],
            opacity=0.75
        )
    x_titles=['Victim'] + [
        f'{i_reset} Resets' for i_reset in range(max_n_resets + 1)
    ]
    y_titles=[
        f'Qubit {qubit}' for qubit in range(N_QUBITS)
    ]
    fig = make_subplots(
        rows=fig_rows, cols=fig_cols,
        x_title='Rotation Angle θ',
        y_title='1-Output Frequency',
        horizontal_spacing=0.024,
        vertical_spacing=0.01,
        shared_xaxes=True,
        column_titles=x_titles,
    )
    fig.update_layout(
        margin=dict(l=60, r=0, t=55, b=55, pad=0),
        height=55 * 2 + fig_rows * 120,
        title_text='Qubit State Retention - '
            f'{be_proper_name_dict[be_name]}, '
            f'{"With" if measure_first else "Without"} Victim Measurement, Pass {i_pass}',
        title_y=0.99
    )
    for r in range(fig_rows):
        fig.update_yaxes(title_text=y_titles[r], title_standoff=0, row=r + 1, col=1)
        for c in range(fig_cols):
            fig.update_xaxes(
                tickmode = 'array',
                tickvals = [0, np.pi / 4, np.pi / 2, np.pi * 3 / 4, np.pi],
                ticktext = ['0', 'π/4', 'π/2', '3π/4', 'π'],
                row=r + 1, col=c + 1
    
            )
    
    for i, f in enumerate(fig_list):
        fig.add_trace(
            f.data[0], row=i // fig_cols + 1, col=i % fig_cols + 1
        )
    
    for i, f in enumerate(curve_list):
        fig.add_trace(
            f.data[0], row=i // fig_cols + 1, col=i % fig_cols + 1
        )

    return fig

In [None]:
def plot_metrics(
    results, be_name, measure_first,
    i_pass_train=1, i_pass_test=0,
    y='attacker_frequency'
): 
    fig_rows = 3
    fig_cols = N_QUBITS
    
    snr_df, acc_df, loss_df = summarize_metrics(
        results, be_name, measure_first,
        i_pass_train=i_pass_train, i_pass_test=i_pass_test,
        y=y
    )
    x_titles=[
        f'Qubit {qubit}' for qubit in range(N_QUBITS)
    ]
    y_titles=['SNR (dB)', 'Binary Accuracy', 'Prediction Loss']
    fig = make_subplots(
        rows=fig_rows, cols=fig_cols,
        x_title='Number of Reset Operations',
        horizontal_spacing=0.01,
        vertical_spacing=0.03,
        shared_xaxes=True,
        shared_yaxes=True,
        column_titles=x_titles
    )
    fig.update_layout(
        margin=dict(l=0, r=0, t=55, b=55, pad=0),
        height=55 * 2 + fig_rows * 120,
        title_text='Performance Metrics - '
            f'{be_proper_name_dict[be_name]}, '
            f'{"With" if measure_first else "Without"} Victim Measurement',
        title_y=0.98,
        boxmode='group',
        showlegend=False
    )
    fig.update_xaxes(
        showgrid=True, zeroline=False
    )
    for r in range(fig_rows):
        fig.update_yaxes(
            title_text=y_titles[r], title_standoff=17,
            row=r + 1, col=1
        )
    
    for qubit in range(N_QUBITS):
        fig.add_traces(
            px.line(
                snr_df[snr_df['qubit'] == qubit],
                x='n_resets', y='snr', color='category',
            ).data,
            rows=[1] * 2, cols=[qubit + 1] * 2
        )
        fig.add_traces(
            px.box(
                loss_df[loss_df['qubit'] == qubit],
                x='n_resets', y='loss', color='category',
            ).data,
            rows=[3] * 2, cols=[qubit + 1] * 2
        )
        fig.add_traces(
            px.line(
                acc_df[acc_df['qubit'] == qubit],
                x='n_resets', y='acc', color='category',
            ).data,
            rows=[2] * 2, cols=[qubit + 1] * 2
        )
    
    return fig

In [None]:
def plot_acc(results):
    panels = [
        ('ibmq_jakarta', True, []),
        ('ibmq_jakarta', False, []),
        ('ibm_lagos', True, []),
        ('ibm_lagos', False, []),
        ('ibm_perth', False, [1])
    ]
    fig_rows = 2
    fig_cols = 3
    
    data = [mean_acc(results, *panel) for panel in panels]
    
    mean = data[-1].copy()
    mean['acc'] = 0
    for d in data:
        mean['acc'] += d['acc']
    mean['acc'] /= len(data)
    data.append(mean)
    
    fig = make_subplots(
        rows=fig_rows, cols=fig_cols,
        x_title='Number of Reset Operations',
        horizontal_spacing=0.01,
        vertical_spacing=0.1,
        shared_xaxes=True,
        shared_yaxes=True,
        subplot_titles=[
            f'{be_proper_name_dict[be_name]}, '
            f'{"With" if measure_first else "Without"} Victim Meas.'
            for be_name, measure_first, _ in panels
        ] + ['Mean']
    )
    fig.update_yaxes(range=[0.45, 1.05])
    fig.update_layout(
        margin=dict(l=0, r=0, t=55, b=55, pad=0),
        height=55 * 2 + fig_rows * 180,
        title_text='Binary Prediction Accuracy',
        title_y=0.98,
        showlegend=False
    )
    for i, d in enumerate(data):
        fig.add_traces(
            px.line(
                d, x='n_resets', y='acc', color='category'
            ).data,
            rows=[i % fig_rows + 1] * 2, cols=[i // fig_rows + 1] * 2
        )
    return fig

### 3.2 - Obtaining Results

In [None]:
# results = make_results(jobs, configs, MAX_N_RESETS, N_SHOTS, N_PASSES)
# u.save(results, f'{data_dir}/results.pickle')

In [None]:
results = u.load(f'{data_dir}/results.pickle')

### 3.3 - Plotting Results

Figure 4

In [None]:
plot_freq(results, be_proper_name_dict, 'ibmq_jakarta', 0, True)

In [None]:
for be_name in be_name_list:
    for victim_op in results['victim_op'].unique():
        measure_first = (victim_op == 'meas')
        
        for i_pass in range(N_PASSES):
            fig = plot_freq(results, be_proper_name_dict, be_name, i_pass, measure_first)
            fig.write_image(f'{figures_dir}/freq/{be_name}_{victim_op}_p{i_pass}.pdf', width=FIG_DOUBLE_WIDTH)
            
            for qubit in range(N_QUBITS):
                fig = plot_freq(
                    results, be_proper_name_dict, be_name, i_pass, measure_first,
                    qubit_range=[qubit]
                )
                fig.write_image(f'{figures_dir}/freq/{be_name}_{victim_op}_p{i_pass}_q{qubit}.pdf', width=FIG_DOUBLE_WIDTH)

Figure 5

In [None]:
plot_metrics(results, 'ibmq_jakarta', True)

In [None]:
for be_name in be_name_list:
    for victim_op in results['victim_op'].unique():
        measure_first = (victim_op == 'meas')
        
        fig = plot_metrics(results, be_name, measure_first)
        fig.write_image(f'{figures_dir}/metrics/{be_name}_{victim_op}.pdf', width=FIG_DOUBLE_WIDTH)

Figure 6

In [None]:
fig = plot_acc(results)
fig.write_image(f'{figures_dir}/acc/acc.pdf', width=FIG_SINGLE_WIDTH)
fig.show()